cod*_*123 8 apache-kafka apache-kafka-streams
我正在研究一个用例,我创建了将数据从 mongo 发送到 elasticsearch 的管道。
Mongo -> Spring Boot -> Kafka -> Transformer(KStream) -> Kafka -> Consumer(发送到 Elastic Search。)
我必须计算记录从 Mongo 到 Elastic search 所花费的时间。我想到使用 Kafka Headers 并继续在下一个 Kafka Producer 中转发它们的值,最后通过从当前时间戳中减去来计算相同的值。
我可以从生产者发送标头,但如何从 Kafka Streams 发送相同的标头。在下面的代码中,我想发送在消费 inTopic 时获得的标头并将它们发送到 outTopic。
private StreamsBuilder buildStream(final String bootstrapServers, final String inTopic, final String outTopic) {
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> kStream = streamsBuilder.stream(inTopic);
kStream.filter(new Predicate<String, String>() {
public boolean test(String s, String s2) {
return true;
}
})
.to(outTopic);
return streamsBuilder;
}
Run Code Online (Sandbox Code Playgroud)
小智 2
您可以通过 ProcessorContext 访问处理器 API 中的标头,并将 DSL 和处理器 API 一起使用。
我要做的就是创建 Transformer 的实现并在transform()
. 这个问题似乎与您的问题类似,它有一个带有代码片段的答案,我相信这可能会有所帮助:Set timestamp in output with Kafka Streams
归档时间: |
|
查看次数: |
6495 次 |
最近记录: |