Kafka Streams - 处理器 API - 转发到不同的主题

xma*_*mar 5 apache-kafka apache-kafka-streams kafka-streams-scala

我有一个 Processor-API 处理器,它在内部转发到几个单独的接收器(想想事件分类器,尽管它在事件之间也有状态逻辑)。我正在考虑稍后将其中两个主题连接起来。一旦加入,我会将元素的更新(丰富)版本转发到我实际加入的那些主题。

如果在处理器 API 代码中转发到多个接收器(接收器 1、接收器 2),而这些接收器又被发送到主题,您将如何混合 DSL?

我想你可以创建单独的流,比如

val stream1 = builder.stream(outputTopic) 
val stream2 = builder.stream(outputTopic2)
Run Code Online (Sandbox Code Playgroud)

并从那里开始构建?然而,这会创建更多的子拓扑 - 这里的含义是什么?

另一种可能性是在处理器 API 中拥有自己的状态存储,并在同一个处理器中对其进行管理(我实际上正在这样做)。它增加了代码的复杂性,但不是更高效吗?例如,您可以删除不再使用的数据(一旦进行连接,您可以将新连接的数据转发到接收器,并且它不再符合连接条件)。还有其他效率问题吗?

Mat*_*Sax 5

StreamsBuilder最简单的方法可能是通过从 a 开始并使用来将处理器 API 与 DSL 混合transform()

StreamsBuilder builder = new StreamsBuilder()
KStream[] streams = builder.stream("input-topic")
                           .transform(/* put your processor API code here */)
                           .branch(...);

KStream joined = streams[0].join(streams[1], ...);
Run Code Online (Sandbox Code Playgroud)

也可以先将中间流写入主题并读回它们。您获得更多子拓扑的事实应该不用担心。

通过状态手动进行连接是可能的,但很难正确编码。如果可能,我建议使用 DSL 提供的连接运算符。

  • 需要相同的类型——如果您有不同的类型,您将用 POJO 包装所有这些类型,尽管它总是只有一个成员集...我不知道连接示例。但是,您可以使用 DSL 连接作为示例(DSL 内部编译为 PAPI:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams /kstream/internals/KStreamImpl.java#L657) (2认同)