给定:带有KStream::transform
. 作为Transformer::transform
执行的一部分,从输入一 ( KeyValue<String, Message>
)生成多条消息。
我可能可以KeyValue<String, List<Message>>
从返回对象Transformer::transform
并将其应用flatMapValues
为拓扑中的下一个处理器来平整列表。但是我想知道是否可以ProcessorContext::forward
用于相同的目标,即
public KeyValue<String, Message> transform(String key, Message message) {
Iterable<Message> messages = generateMultipleFromOne(message);
messages.forEach(m->context.forward(key, m));
return null;
}
Run Code Online (Sandbox Code Playgroud) 我觉得我可能缺少一些非常基本的东西,但我还是会问。
有多个分区的输入主题。我使用 selectKey 作为 DSL 拓扑的一部分。selectKey 始终返回相同的值。我的期望是,在 selectKey() 触发的内部重新分区之后,拓扑中的下一个处理器将在同一个分区上为同一个键调用。然而,transform() 的下一个处理器在不同分区上为相同的键调用。
拓扑:
Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream("in-topic", Consumed.with(Serdes.String(), new JsonSerde<>(CatalogEvent.class)))
.selectKey((k,v) -> "key")
.transform(() -> new Processor())
.print();
return builder.build();
}
Run Code Online (Sandbox Code Playgroud)
转换使用的处理器类
public class Processor implements Transformer<String, CatalogEvent, KeyValue<String, DispEvent>> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, DispEvent> transform(String key, CatalogEvent catalogEvent) {
System.out.println("key:" + key + " partition:" + context.partition());
return null;
}
@Override …
Run Code Online (Sandbox Code Playgroud)