Mik*_*der 9 apache-kafka apache-kafka-streams
我有一个需要收听多个不同主题的应用程序; 每个主题都有关于如何处理消息的单独逻辑.我曾想过为每个KafkaStreams实例使用相同的kafka属性,但是我得到的错误如下所示.
错误
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
Run Code Online (Sandbox Code Playgroud)
代码(kotlin)
class KafkaSetup() {
companion object {
private val LOG = LoggerFactory.getLogger(this::class.java)
}
fun getProperties(): Properties {
val properties = Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
return properties
}
private fun listenOnMyTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
kStream.foreach { key, value -> LOG.info("do stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
private fun listenOnMyOtherTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")
kStream.foreach { key, value -> LOG.info("do other stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
}
Run Code Online (Sandbox Code Playgroud)
我发现这个引用表明你不能application.id用于多个主题,但我发现很难找到支持它的参考文档.该文档的application.id状态:
流处理应用程序的标识符.在Kafka集群中必须是唯一的.它用作1)默认的client-id前缀,2)成员资格管理的group-id,3)changelog主题前缀.
问题
application.id来启动两个KafkaStreams列出不同主题的流?如果是的话,怎么样?详情: kafka 0.11.0.2
Mat*_*Sax 19
Kafka Streams通过分区进行扩展,而不是主题.因此,如果您使用相同的方式启动多个应用程序,则application.id它们必须与它们订阅的输入主题及其处理逻辑相同.应用程序使用application.idas 形成一个使用者组group.id,因此输入主题的不同分区被分配给不同的实例.
如果您有不同的主题具有相同的逻辑,您可以一次订阅所有主题(在每个实例中您开始).缩放仍然基于分区.(它基本上是您输入主题的"合并".)
如果要通过主题进行扩展和/或具有不同的处理逻辑,则必须application.id对不同的Kafka Streams应用程序使用不同的.
| 归档时间: |
|
| 查看次数: |
7180 次 |
| 最近记录: |