采用Kafka流的事件采购

ami*_*ayh 4 event-sourcing apache-kafka apache-kafka-streams

我正在尝试在Kafka流之上实现一个简单的CQRS /事件源概念证明(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/中所述)

我有4个基本部分:

  1. commands topic,使用聚合ID作为按顺序处理每个聚合命令的键
  2. events主题,发布聚合状态的每个更改(同样,密钥是聚合ID).本主题的保留策略为"永不删除"
  3. 一个KTable,用于减少聚合状态并将其保存到状态存储

    events topic stream ->
    group to a Ktable by aggregate ID ->
    reduce aggregate events to current state ->
    materialize as a state store
    
  4. 命令处理器 - 命令流,左连接聚合状态KTable.对于结果流中的每个条目,使用函数(command, state) => events生成结果事件并将其发布到events主题

问题是 - 有没有办法确保我在州商店中拥有最新版本的聚合?

如果违反业务规则,我想拒绝命令(例如,如果实体被标记为已删除,则修改实体的命令无效).但是如果a DeleteCommand发布后跟ModifyCommand右后一个,则delete命令将生成DeletedEvent,但是当ModifyCommand处理时,来自状态存储的加载状态可能尚未反映,并且将发布冲突事件.

我不介意牺牲命令处理吞吐量,我宁愿得到一致性保证(因为所有内容都按相同的密钥分组,最终应该在同一个分区中)

希望很清楚:)有什么建议吗?

Con*_*enu 5

我不认为Kafka对CQRS和事件采购有好处,就像你描述的那样,因为它缺乏(简单)方法来确保防止并发写入.该文章有关本详细会谈.

所说的方式是指您希望命令生成零个或多个事件或因异常而失败的事实; 这是带有事件采购的经典CQRS.大多数人都期望这种架构.

您可以采用不同风格的事件采购.您的命令处理程序可以为每个接收到的命令(即DeleteWasAccepted)生成事件.然后,事件处理程序最终可以以事件源方式处理该事件(通过从其事件流重建Aggregate的状态)并发出其他事件(即ItemDeletedItemDeletionWasRejected).因此,命令被发送 - 忘记,发送异步,客户端不等待立即响应.然而,它等待一个描述其命令执行结果的事件.

一个重要的方面是事件处理程序必须以串行方式处理来自同一聚合的事件(恰好一次且按顺序).这可以使用单个Kafka Consumer Group来实现.您可以在此视频中看到此架构.