Luk*_*ela 2 apache-kafka apache-kafka-streams
我们有以下问题:
我们想听某些Kafka主题并建立它的“历史记录”-因此,对于指定的键,提取一些数据,将其添加到该键的现有列表中(如果不存在,则创建一个新的键),然后将其放入另一个主题,它只有一个分区,并且高度压缩。另一个应用程序可以仅收听该主题并更新其历史记录列表。
我在想它如何适合Kafka流库。我们当然可以使用聚合:
msgReceived.map((key, word) -> new KeyValue<>(key, word))
.groupBy((k,v) -> k, stringSerde, stringSerde)
.aggregate(String::new,
(k, v, stockTransactionCollector) -> stockTransactionCollector + "|" + v,
stringSerde, "summaries2")
.to(stringSerde, stringSerde, "transaction-summary50");
Run Code Online (Sandbox Code Playgroud)
这会创建一个由Kafka支持的本地商店,并将其用作历史记录表。
我担心的是,如果我们决定扩展此类应用程序,则每个正在运行的实例都会创建一个新的主题${applicationId}-${storeName}-changelog(我假设每个应用程序都有不同的主题applicationId)。每个实例开始消耗输入主题,获取不同的键集并构建状态的不同子集。如果Kafka决定重新平衡,则某些实例将开始错过本地存储中的某些历史状态,因为它们会使用一组全新的分区。
问题是,如果我只是为每个正在运行的实例设置相同的applicationId,那么它最终是否应从同一实例(每个正在运行的实例具有相同的本地状态)重播所有数据?
小智 5
为什么要创建多个具有不同ID的应用程序来执行相同的工作?Kafka实现并行性的方法是通过任务:
通过将应用程序的处理器拓扑划分为多个任务,可以对其进行扩展。
更具体地说,Kafka Streams基于应用程序的输入流分区创建固定数量的任务,每个任务分配了来自输入流的分区列表(即Kafka主题)。分区对任务的分配永远不会改变,因此每个任务都是应用程序并行性的固定单元。
然后,任务可以根据分配的分区实例化其自己的处理器拓扑。它们还为其分配的每个分区维护一个缓冲区,并一次从这些记录缓冲区处理消息。结果,可以在没有人工干预的情况下独立且并行地处理流任务。
如果需要扩展应用程序,则可以启动运行相同应用程序(相同应用程序ID)的新实例,某些已经分配的任务将重新分配给新实例。库将自动处理本地状态存储的迁移:
当发生重新分配时,某些分区以及相应的任务(包括任何本地状态存储)将从现有线程“迁移”到新添加的线程。结果,Kafka Streams以Kafka主题分区的粒度有效地重新平衡了应用程序实例之间的工作负载。
我建议您看一下本指南。
我担心的是,如果我们决定扩展这样的应用程序,每个正在运行的实例都会创建一个新的支持主题 ${applicationId}-${storeName}-changelog(我假设每个应用程序都有不同的 applicationId)。每个实例开始使用输入主题,获取一组不同的键并构建不同的状态子集。如果 Kafka 决定重新平衡,一些实例将开始错过本地存储中的一些历史状态,因为它们获得了一组全新的分区来消费。
有些假设是不正确的:
因此,如果所有实例都使用相同的应用程序 ID,则所有正在运行的应用程序实例都将使用相同的更改日志主题名称,因此,您打算做什么,应该是开箱即用的。
| 归档时间: |
|
| 查看次数: |
1455 次 |
| 最近记录: |