在使用 Kafka Streams DSL 时,有没有办法使用相同的主题作为两个不同处理例程的源?
StreamsBuilder streamsBuilder = new StreamsBuilder();
// use the topic as a stream
streamsBuilder.stream("topic")...
// use the same topic as a source for KTable
streamsBuilder.table("topic")...
return streamsBuilder.build();
Run Code Online (Sandbox Code Playgroud)
上面的幼稚实现TopologyException在运行时抛出一个:无效拓扑:主题主题已经被另一个源注册。如果我们深入研究底层处理器 API,这是完全有效的。使用它是唯一的出路吗?
更新: 迄今为止我发现的最接近的替代方案:
StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<Object, Object> stream = streamsBuilder.stream("topic");
// use the topic as a stream
stream...
// create a KTable from the KStream
stream.groupByKey().reduce((oldValue, newValue) -> newValue)...
return streamsBuilder.build();
Run Code Online (Sandbox Code Playgroud)