我有一个 Processor-API 处理器,它在内部转发到几个单独的接收器(想想事件分类器,尽管它在事件之间也有状态逻辑)。我正在考虑稍后将其中两个主题连接起来。一旦加入,我会将元素的更新(丰富)版本转发到我实际加入的那些主题。
如果在处理器 API 代码中转发到多个接收器(接收器 1、接收器 2),而这些接收器又被发送到主题,您将如何混合 DSL?
我想你可以创建单独的流,比如
val stream1 = builder.stream(outputTopic)
val stream2 = builder.stream(outputTopic2)
Run Code Online (Sandbox Code Playgroud)
并从那里开始构建?然而,这会创建更多的子拓扑 - 这里的含义是什么?
另一种可能性是在处理器 API 中拥有自己的状态存储,并在同一个处理器中对其进行管理(我实际上正在这样做)。它增加了代码的复杂性,但不是更高效吗?例如,您可以删除不再使用的数据(一旦进行连接,您可以将新连接的数据转发到接收器,并且它不再符合连接条件)。还有其他效率问题吗?
我理解这两个之间的区别,但是,我似乎仍将其KTable
用作“默认值”,而不是真正知道何时更喜欢GlobalKTable
.
请分享您的经验,什么时候GlobalKTable
必须使用它,为什么不使用它等。
我正在尝试启动一个新的SBT Scala项目并在build.sbt
文件中包含以下内容:
name := "ScalaKafkaStreamsDemo"
version := "1.0"
scalaVersion := "2.12.1"
libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
Run Code Online (Sandbox Code Playgroud)
所以根据GitHub repo,在2.0.0中我应该看到我想要使用的Scala类/函数等,但它们似乎似乎不可用.在IntelliJ中我可以打开kafka-streams-2.0.0.jar
,但我没有看到任何Scala类.
我需要包含另一个JAR吗?
就在我们讨论额外JAR的问题时,是否有人知道我需要包含哪些JAR才能使用EmbeddedKafkaCluster
?