将消息从一个Kafka群集流式传输到另一个群集

Tim*_*.G. 4 apache-kafka apache-kafka-streams

我目前正在尝试轻松地将消息从一个Kafka集群上的主题流式传输到另一个Kafka集群(远程 - >本地集群).
我们的想法是立即使用Kafka-Streams,这样我们就不需要复制本地集群上的实际消息,而只需将Kafka-Streams处理的"结果"处理到我们的Kafka-Topics中.

所以,让我们说WordCount演示在另一台PC上的一个Kafka-Instance上,而不是我自己的.我还在我的本地计算机上运行了Kafka-Instance.
现在我想让WordCount演示在主题("远程")上运行,其中包含应该计算单词的句子.
然而,计数应写入我本地系统的主题,而不是"远程"主题.

使用Kafka-Streams API是否可以这样做?
例如

val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic", 
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
    .groupBy((_, word) => word)
    .count("word-counts")

wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)

val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()
Run Code Online (Sandbox Code Playgroud)

非常感谢
- 蒂姆

Mat*_*Sax 6

Kafka Streams仅针对单个群集构建.

解决方法是使用foreach()或类似的实例化您自己的KafkaProducer写入目标集群.请注意,您自己的制作人必须使用同步写入!否则,如果发生故障,您可能会丢失数据.因此,它不是一个非常高效的解决方案.

最好将结果写入源集群并将数据复制到目标集群.请注意,您最有可能在源群集中使用更短的输出主题保留期,因为实际数据在目标群集中的保留时间更长.这允许您限制源群集上的所需存储.

编辑(回复以下来自@quickinsights的评论)

如果您的Kafka流服务停留时间比保留时间长,该怎么办?

这似乎是一个正交的问题,可以针对任何设计提出.应根据最大停机时间设置保留时间,以避免一般数据丢失.请注意,因为应用程序从/向源集群读取/写入,并且源集群输出主题可能以较短的保留时间进行配置,所以如果应用程序出现故障,则不会发生任何不良情况.将不处理输入主题,也不会生成新的输出数据.您可能只担心进入目标集群的复制管道出现故障的情况 - 您应该相应地在源集群中设置输出主题的保留时间,以确保不会丢失任何数据.

它还会将你写回Kafka的内容加倍.

是.它还增加了磁盘上的存储空间.它是应用程序弹性和运行时性能与集群负载之间的权衡(一如既往).你的选择.我个人建议采用上面指出的更具弹性的选项.扩展Kafka集群比处理应用程序代码中的所有弹性边缘情况更容易.

这似乎超级低效

这是个人判断.这是一种权衡,没有客观的对错.