KTables 如何获得它们的初始值?

jak*_*ker 2 java apache-kafka apache-kafka-streams

我最近一直在研究 Kafka Streams API,但在完全理解 KTables 时遇到了一些麻烦。我想我理解一般概念,但我在一些细节上挣扎。

在我的示例应用程序中,我获取一堆价格,然后使用 Kafka Streaming API 为压缩的Kafka 主题 ( Topic-A )生成每个产品的平均价格。我有第二项服务,我想对这些平均价格更新做出反应。所以在第二个服务中,我创建了一个KTableover Topic-A,我可以成功查询它的存储。

我的目标是让第二个服务流程实时响应这些平均价格,而且还可以按需访问每个产品的最新价值。我相信我可以使用 KTable 和 Store 来做到这一点。

起初,我相信:

  • KTable 由本地存储(RocksDB 实例)支持
  • 当 KTable 被初始化时,它会消耗整个Topic-A来构建它的 KTable

然而,似乎 KTables 是(或可以是?)由压缩的 change-log 支持

  1. 这是否意味着在初始化时,KTable 只需要为每个键消费最新的记录?

  2. 如果我运行第二个服务的多个实例,KTables 会共享更改日志吗?我想如果实例数量增加/减少,实例将需要更新它们的本地状态以考虑来自更多/更少分区的数据。

  3. 使用 GlobalKTable 会为我提供每个实例中可用的所有 K/V 对吗?

Dmi*_*sky 5

  1. 这是否意味着在初始化时,KTable只需要消耗每个键的最新记录?

是的。如果底层主题中的数据使得每个值代表该键的完整最新值,则可以配置主题cleanup.policy=compact,Kafka Streams 只需要读取最新值即可恢复KTable(这是一个 RocksDB 存储)。在数据建模方面,这是唯一一种您想要/有意义用作KTable.

  1. 如果我运行第二个服务的多个实例,是否KTables共享更改日志?

是的,他们从同一个变更日志主题中读取,但他们根据state.dir您在 Kafka Streams 配置中提供的参数生成自己的 RocksDB 存储。

  1. 使用 aGlobalKTable会给我每个实例中可用的所有 K/V 对吗?

是的,但GlobalKTables与常规KTables. 我相信新的 1.0.0 版本已经为 增加了功能GlobalKTables,但它们仍然有一些限制。