相关疑难解决方法(0)

采用Kafka流的事件采购

我正在尝试在Kafka流之上实现一个简单的CQRS /事件源概念证明(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/中所述)

我有4个基本部分:

  1. commands topic,使用聚合ID作为按顺序处理每个聚合命令的键
  2. events主题,发布聚合状态的每个更改(同样,密钥是聚合ID).本主题的保留策略为"永不删除"
  3. 一个KTable,用于减少聚合状态并将其保存到状态存储

    events topic stream ->
    group to a Ktable by aggregate ID ->
    reduce aggregate events to current state ->
    materialize as a state store
    
  4. 命令处理器 - 命令流,左连接聚合状态KTable.对于结果流中的每个条目,使用函数(command, state) => events生成结果事件并将其发布到events主题

问题是 - 有没有办法确保我在州商店中拥有最新版本的聚合?

如果违反业务规则,我想拒绝命令(例如,如果实体被标记为已删除,则修改实体的命令无效).但是如果a DeleteCommand发布后跟ModifyCommand右后一个,则delete命令将生成DeletedEvent,但是当ModifyCommand处理时,来自状态存储的加载状态可能尚未反映,并且将发布冲突事件.

我不介意牺牲命令处理吞吐量,我宁愿得到一致性保证(因为所有内容都按相同的密钥分组,最终应该在同一个分区中)

希望很清楚:)有什么建议吗?

event-sourcing apache-kafka apache-kafka-streams

4
推荐指数
1
解决办法
1100
查看次数