Nat*_*les 9 java apache-kafka apache-kafka-streams
通过在Kafka 0.11中添加Headers到记录(ProducerRecord和ConsumerRecord),是否可以在使用Kafka Streams处理主题时获取这些标题?当调用类似mapon的方法时,KStream它提供了记录key和value记录的参数但我无法看到访问它的方法headers.如果我们能够map超越ConsumerRecords ,那就太好了.
恩.
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
Run Code Online (Sandbox Code Playgroud)
像这样的东西会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
Run Code Online (Sandbox Code Playgroud)
Mat*_*Sax 13
从Streams API的2.0版开始,可以访问记录头.(参见KIP-244了解详情.)
可以通过处理器API访问记录的元数据(即,通过transform(),transformValues()或process()),通过给定的"上下文"对象(参见https://docs.confluent.io/current/streams/developer-guide/processor-api.html #adcess-processor-context).
在2.0之前,上下文仅公开主题,分区,偏移量和时间戳---但实际上不会在这些旧版本中读取时由Streams删除的标头.
但是,DSL级别的元数据不可用.但是,还在进行扩展DSL的工作:https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams
| 归档时间: |
|
| 查看次数: |
4984 次 |
| 最近记录: |