标签: kafka-streams-scala

Kafka Streams - 处理器 API - 转发到不同的主题

我有一个 Processor-API 处理器,它在内部转发到几个单独的接收器(想想事件分类器,尽管它在事件之间也有状态逻辑)。我正在考虑稍后将其中两个主题连接起来。一旦加入,我会将元素的更新(丰富)版本转发到我实际加入的那些主题。

如果在处理器 API 代码中转发到多个接收器(接收器 1、接收器 2),而这些接收器又被发送到主题,您将如何混合 DSL?

我想你可以创建单独的流,比如

val stream1 = builder.stream(outputTopic) 
val stream2 = builder.stream(outputTopic2)
Run Code Online (Sandbox Code Playgroud)

并从那里开始构建?然而,这会创建更多的子拓扑 - 这里的含义是什么?

另一种可能性是在处理器 API 中拥有自己的状态存储,并在同一个处理器中对其进行管理(我实际上正在这样做)。它增加了代码的复杂性,但不是更高效吗?例如,您可以删除不再使用的数据(一旦进行连接,您可以将新连接的数据转发到接收器,并且它不再符合连接条件)。还有其他效率问题吗?

apache-kafka apache-kafka-streams kafka-streams-scala

5
推荐指数
1
解决办法
2052
查看次数

使用 1 个分区时何时使用 GlobalKTable 而不是 KTable

我理解这两个之间的区别,但是,我似乎仍将其KTable用作“默认值”,而不是真正知道何时更喜欢GlobalKTable.

请分享您的经验,什么时候GlobalKTable必须使用它,为什么不使用它等。

apache-kafka-streams kafka-streams-scala

4
推荐指数
1
解决办法
1401
查看次数

如何将Kafka Streams的Scala API定义为build.sbt中的依赖项?

我正在尝试启动一个新的SBT Scala项目并在build.sbt文件中包含以下内容:

name := "ScalaKafkaStreamsDemo"
version := "1.0"
scalaVersion := "2.12.1"

libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))

libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
Run Code Online (Sandbox Code Playgroud)

所以根据GitHub repo,在2.0.0中我应该看到我想要使用的Scala类/函数等,但它们似乎似乎不可用.在IntelliJ中我可以打开kafka-streams-2.0.0.jar,但我没有看到任何Scala类.

我需要包含另一个JAR吗?

就在我们讨论额外JAR的问题时,是否有人知道我需要包含哪些JAR才能使用EmbeddedKafkaCluster

scala apache-kafka apache-kafka-streams kafka-streams-scala

2
推荐指数
1
解决办法
632
查看次数