如何使用Kafka Stream DSL使用处理器过滤密钥和值

Ton*_*ony 6 apache-kafka-streams

我有一个与StateStore交互的处理器,用于过滤并对消息执行复杂的逻辑.在process(key,value)context.forward(key,value)用来发送我需要的键和值的方法中.出于调试目的,我也打印出来.

我有一个KStream mergedStream,它来自另外两个流的连接.我想将处理器应用于该流的记录.我实现了这个:mergedStream.process(myprocessor,"stateStoreName")

当我启动这个程序时,我可以看到要打印到我的控制台的正确值.但是,如果我使用主题上mergedStream.to("topic")的值将mergedStream发送到主题,则不是我在处理器中转发的那个,而是原始的.

我使用kafka-streams 0.10.1.0.

将我在处理器中转发的值转换为另一个流的最佳方法是什么?

是否可以将Processor APIKStream DSL创建的流混合使用?

Mat*_*Sax 12

短:

要解决您的问题,您可以使用transform(...)而不是使用它来process(...)访问DSL中的Processor API.

:

如果您使用process(...)将处理器应用于流 - 但是,这是一个"终止"(或接收器)操作(其返回类型为void),即它不返回任何结果(此处"接收器"仅表示运算符没有后继 - 它并不意味着任何结果都写在某个地方!)

此外,如果你打电话mergedStream.process(...),mergedStream.to(...)你基本上分支和复制你的流并发送一个副本给每个下游运营商(即一个副本process和一个副本到to.

混合DSL和处理器API是绝对可能的(你已经做过了;)).但是,使用process(...)您无法forward(...)在DSL中使用消费者数据- 如果您想使用处理器API结果,则可以使用transform(...)而不是process(...).