是否可以使用Kafka Streams访问邮件头?

Nat*_*les 9 java apache-kafka apache-kafka-streams

通过在Kafka 0.11中添加Headers到记录(ProducerRecordConsumerRecord),是否可以在使用Kafka Streams处理主题时获取这些标题?当调用类似mapon的方法时,KStream它提供了记录keyvalue记录的参数但我无法看到访问它的方法headers.如果我们能够map超越ConsumerRecords ,那就太好了.

恩.

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 
Run Code Online (Sandbox Code Playgroud)

像这样的东西会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...
Run Code Online (Sandbox Code Playgroud)

Mat*_*Sax 13

从Streams API的2.0版开始,可以访问记录头.(参见KIP-244了解详情.)

可以通过处理器API访问记录的元数据(即,通过transform(),transformValues()process()),通过给定的"上下文"对象(参见https://docs.confluent.io/current/streams/developer-guide/processor-api.html #adcess-processor-context).

在2.0之前,上下文仅公开主题,分区,偏移量和时间戳---但实际上不会在这些旧版本中读取时由Streams删除的标头.

但是,DSL级别的元数据不可用.但是,还在进行扩展DSL的工作:https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams

  • 澄清 Matthias 所说的话:是的,Kafka Streams 中的 Processor API 使您可以访问记录元数据,例如主题名称、分区号、偏移量等。 Kafka Streams 中的 DSL 不允许您访问。但是因为你可以结合 Processor API 和 DSL,你仍然可以编写一个基于 DSL 的流处理应用程序,通过使用 DSL 的 `transform()` 或 `transformValues()` 函数来访问记录元数据,它允许你通过在处理器 API 的处理器/转换器中。 (2认同)
  • @MatthiasJ.Sax 对我来说仍然不是 100% 清楚:这是否意味着通过 Streams 1.0.1 无法通过 DSL 或处理器 API 访问消息的标头?我通过检查 ProcessorContext (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/ProcessorContext.html)来询问,我无法找到当前处理的消息的标头。 (2认同)