小编shp*_*chu的帖子

使用 Kafka Streams DSL 多次使用相同的主题作为源

在使用 Kafka Streams DSL 时,有没有办法使用相同的主题作为两个不同处理例程的源?

StreamsBuilder streamsBuilder = new StreamsBuilder();

// use the topic as a stream
streamsBuilder.stream("topic")...

// use the same topic as a source for KTable
streamsBuilder.table("topic")...

return streamsBuilder.build();
Run Code Online (Sandbox Code Playgroud)

上面的幼稚实现TopologyException在运行时抛出一个:无效拓扑:主题主题已经被另一个源注册。如果我们深入研究底层处理器 API,这是完全有效的。使用它是唯一的出路吗?

更新: 迄今为止我发现的最接近的替代方案:

StreamsBuilder streamsBuilder = new StreamsBuilder();

final KStream<Object, Object> stream = streamsBuilder.stream("topic");

// use the topic as a stream
stream...

// create a KTable from the KStream
stream.groupByKey().reduce((oldValue, newValue) -> newValue)...

return streamsBuilder.build();
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

5
推荐指数
1
解决办法
3344
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1