Dim*_*der 3 java apache-kafka apache-kafka-streams
给定:带有KStream::transform
. 作为Transformer::transform
执行的一部分,从输入一 ( KeyValue<String, Message>
)生成多条消息。
我可能可以KeyValue<String, List<Message>>
从返回对象Transformer::transform
并将其应用flatMapValues
为拓扑中的下一个处理器来平整列表。但是我想知道是否可以ProcessorContext::forward
用于相同的目标,即
public KeyValue<String, Message> transform(String key, Message message) {
Iterable<Message> messages = generateMultipleFromOne(message);
messages.forEach(m->context.forward(key, m));
return null;
}
Run Code Online (Sandbox Code Playgroud)
来自Transformer.transform(K key, V value)的javadoc:
如果有多个输出记录应该向下游转发,
ProcessorContext.forward(Object, Object)
并且ProcessorContext.forward(Object, Object, To)
可以使用。请注意,返回一个新的 KeyValue 只是为了方便。同样可以通过使用
ProcessorContext.forward(Object, Object)
和返回 null来实现。
归档时间: |
|
查看次数: |
929 次 |
最近记录: |