Kafka Streams KTable Store在这种情况下对于紧凑的输入主题不是很有用,替代方法?

cod*_*ent 2 apache-kafka apache-kafka-streams

我正在为事件源应用程序建模,遇到概念上的疑问,我将使用一个典型的购物域来显示它:

假设一个客户主题接收到以下类型的事件:

CustomerCreated id = x, name= xxx, address = xxx
CustomerUpdated id = x, name = xxx
CustomerUpdated id = x, address = xxx
Run Code Online (Sandbox Code Playgroud)

请注意,更新事件不一定会更改/通知所有客户字段。

我正在使用KTable并使用其存储来运行交互式查询来实现此主题:

KTable<Integer, Customer> customers = builder.table(Topics.CUSTOMER.keySerde(), Topics.CUSTOMER.valueSerde(), Topics.CUSTOMER_STORE.name());
Run Code Online (Sandbox Code Playgroud)

假设会有很多客户,我想使用一个紧凑的客户主题。这无法恢复,因为压缩的主题带有中间消息,在我的情况下,该消息不能包含客户的全部信息(可能是带有部分信息的更新事件)。

根据KStreamBuilder.table的javadoc,创建的KTable存储不是更改日志,因此可以从原始主题中恢复。

The resulting KTable will be materialized in a local KeyValueStore with the given storeName. However, no internal changelog topic is created since the original input topic can be used for recovery 
Run Code Online (Sandbox Code Playgroud)

在我的情况下,我如何为客户提供一个紧凑的主题,同时又可以从该主题创建一个商店,并且可以利用客户的全部信息进行恢复?

Mat*_*Sax 8

正如您正确指出的那样,您的输入主题无法压缩,因为每个更新记录都被解释为对前一个记录的覆盖,因此必须是“完全”更新(changelog主题不支持“部分”更新)。

KTable如下方式读取主题将具有相同的语义,并将通过“ put”操作将主题具体化为键值存储(逻辑删除作为删除执行)。

如果要使用Kafka Streams进行部分更新,则可以通过以下方式使用聚合KStream

KTable table = builder.stream(...).groupByKey().aggregate(...);
Run Code Online (Sandbox Code Playgroud)

这使您可以使用Aggregator可以执行部分​​更新的自定义。对于每个输入记录,您将获得旧/当前状态和当前输入记录(即,可能的部分更新),并Aggregator返回新(更新)状态。这为您提供了最大的灵活性,您可以根据需要更新状态。

在这种情况下,无需压缩输入主题。结果KTable将由包含更新记录和状态完整副本的changelog主题支持。此changelog主题将自动配置日志压缩,因此永远不会丢失其状态。

您还可以将生成的changelog主题写入应该使用日志压缩配置的输出主题:

table.toStream().to(...);
Run Code Online (Sandbox Code Playgroud)

您可能想通过parameter禁用聚合步骤中的缓存Materialized。有关更多详细信息,请参阅文档:https : //docs.confluent.io/current/streams/developer-guide/memory-mgmt.html