cod*_*ent 2 apache-kafka apache-kafka-streams
我正在为事件源应用程序建模,遇到概念上的疑问,我将使用一个典型的购物域来显示它:
假设一个客户主题接收到以下类型的事件:
CustomerCreated id = x, name= xxx, address = xxx
CustomerUpdated id = x, name = xxx
CustomerUpdated id = x, address = xxx
Run Code Online (Sandbox Code Playgroud)
请注意,更新事件不一定会更改/通知所有客户字段。
我正在使用KTable并使用其存储来运行交互式查询来实现此主题:
KTable<Integer, Customer> customers = builder.table(Topics.CUSTOMER.keySerde(), Topics.CUSTOMER.valueSerde(), Topics.CUSTOMER_STORE.name());
Run Code Online (Sandbox Code Playgroud)
假设会有很多客户,我想使用一个紧凑的客户主题。这无法恢复,因为压缩的主题带有中间消息,在我的情况下,该消息不能包含客户的全部信息(可能是带有部分信息的更新事件)。
根据KStreamBuilder.table的javadoc,创建的KTable存储不是更改日志,因此可以从原始主题中恢复。
The resulting KTable will be materialized in a local KeyValueStore with the given storeName. However, no internal changelog topic is created since the original input topic can be used for recovery
Run Code Online (Sandbox Code Playgroud)
在我的情况下,我如何为客户提供一个紧凑的主题,同时又可以从该主题创建一个商店,并且可以利用客户的全部信息进行恢复?
正如您正确指出的那样,您的输入主题无法压缩,因为每个更新记录都被解释为对前一个记录的覆盖,因此必须是“完全”更新(changelog主题不支持“部分”更新)。
按KTable
如下方式读取主题将具有相同的语义,并将通过“ put”操作将主题具体化为键值存储(逻辑删除作为删除执行)。
如果要使用Kafka Streams进行部分更新,则可以通过以下方式使用聚合KStream
:
KTable table = builder.stream(...).groupByKey().aggregate(...);
Run Code Online (Sandbox Code Playgroud)
这使您可以使用Aggregator
可以执行部分更新的自定义。对于每个输入记录,您将获得旧/当前状态和当前输入记录(即,可能的部分更新),并Aggregator
返回新(更新)状态。这为您提供了最大的灵活性,您可以根据需要更新状态。
在这种情况下,无需压缩输入主题。结果KTable
将由包含更新记录和状态完整副本的changelog主题支持。此changelog主题将自动配置日志压缩,因此永远不会丢失其状态。
您还可以将生成的changelog主题写入应该使用日志压缩配置的输出主题:
table.toStream().to(...);
Run Code Online (Sandbox Code Playgroud)
您可能想通过parameter禁用聚合步骤中的缓存Materialized
。有关更多详细信息,请参阅文档:https : //docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
归档时间: |
|
查看次数: |
588 次 |
最近记录: |