如何使用 KStream 发送标头

cod*_*123 8 apache-kafka apache-kafka-streams

我正在研究一个用例,我创建了将数据从 mongo 发送到 elasticsearch 的管道。

Mongo -> Spring Boot -> Kafka -> Transformer(KStream) -> Kafka -> Consumer(发送到 Elastic Search。)

我必须计算记录从 Mongo 到 Elastic search 所花费的时间。我想到使用 Kafka Headers 并继续在下一个 Kafka Producer 中转发它们的值,最后通过从当前时间戳中减去来计算相同的值。

我可以从生产者发送标头,但如何从 Kafka Streams 发送相同的标头。在下面的代码中,我想发送在消费 inTopic 时获得的标头并将它们发送到 outTopic。

private StreamsBuilder buildStream(final String bootstrapServers, final String inTopic, final String outTopic) {
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> kStream = streamsBuilder.stream(inTopic);

    kStream.filter(new Predicate<String, String>() {
        public boolean test(String s, String s2) {
            return true;
        }
    })
    .to(outTopic);
    return streamsBuilder;
}
Run Code Online (Sandbox Code Playgroud)

小智 2

您可以通过 ProcessorContext 访问处理器 API 中的标头,并将 DSL 和处理器 API 一起使用。

我要做的就是创建 Transformer 的实现并在transform(). 这个问题似乎与您的问题类似,它有一个带有代码片段的答案,我相信这可能会有所帮助:Set timestamp in output with Kafka Streams