Tim*_*.G. 4 apache-kafka apache-kafka-streams
我目前正在尝试轻松地将消息从一个Kafka集群上的主题流式传输到另一个Kafka集群(远程 - >本地集群).
我们的想法是立即使用Kafka-Streams,这样我们就不需要复制本地集群上的实际消息,而只需将Kafka-Streams处理的"结果"处理到我们的Kafka-Topics中.
所以,让我们说WordCount演示在另一台PC上的一个Kafka-Instance上,而不是我自己的.我还在我的本地计算机上运行了Kafka-Instance.
现在我想让WordCount演示在主题("远程")上运行,其中包含应该计算单词的句子.
然而,计数应写入我本地系统的主题,而不是"远程"主题.
使用Kafka-Streams API是否可以这样做?
例如
val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic",
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)
val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()
Run Code Online (Sandbox Code Playgroud)
非常感谢
- 蒂姆
Kafka Streams仅针对单个群集构建.
解决方法是使用foreach()或类似的实例化您自己的KafkaProducer写入目标集群.请注意,您自己的制作人必须使用同步写入!否则,如果发生故障,您可能会丢失数据.因此,它不是一个非常高效的解决方案.
最好将结果写入源集群并将数据复制到目标集群.请注意,您最有可能在源群集中使用更短的输出主题保留期,因为实际数据在目标群集中的保留时间更长.这允许您限制源群集上的所需存储.
编辑(回复以下来自@quickinsights的评论)
如果您的Kafka流服务停留时间比保留时间长,该怎么办?
这似乎是一个正交的问题,可以针对任何设计提出.应根据最大停机时间设置保留时间,以避免一般数据丢失.请注意,因为应用程序从/向源集群读取/写入,并且源集群输出主题可能以较短的保留时间进行配置,所以如果应用程序出现故障,则不会发生任何不良情况.将不处理输入主题,也不会生成新的输出数据.您可能只担心进入目标集群的复制管道出现故障的情况 - 您应该相应地在源集群中设置输出主题的保留时间,以确保不会丢失任何数据.
它还会将你写回Kafka的内容加倍.
是.它还增加了磁盘上的存储空间.它是应用程序弹性和运行时性能与集群负载之间的权衡(一如既往).你的选择.我个人建议采用上面指出的更具弹性的选项.扩展Kafka集群比处理应用程序代码中的所有弹性边缘情况更容易.
这似乎超级低效
这是个人判断.这是一种权衡,没有客观的对错.
| 归档时间: |
|
| 查看次数: |
1220 次 |
| 最近记录: |