处理CQRS读取端的无序事件

Das*_*jes 10 cqrs event-sourcing event-store

我已经阅读了Jonathan Oliver关于处理无序事件的好帖子.

http://blog.jonathanoliver.com/cqrs-out-of-sequence-messages-and-read-models/

我们使用的解决方案是将消息出列并将其置于"保持表"中,直到收到具有先前序列的所有消息.当收到所有先前的消息后,我们将所有消息从保留表中取出,并通过适当的处理程序依次运行它们.一旦所有处理程序都成功执行,我们将从保留表中删除消息并将更新提交给读取的模型.

这适用于我们,因为域发布事件并使用适当的序列号标记它们.如果没有这个,下面的解决方案将变得更加困难 - 如果不是不可能的话.

此解决方案使用关系数据库作为持久性存储机制,但我们没有使用存储引擎的任何关系方面.与此同时,所有这一切都有一个警告.如果消息2,3和4到达但消息1从未到达,则我们不会应用其中任何消息.只有在出现错误处理消息1或消息1以某种方式丢失时,才会出现这种情况.幸运的是,很容易纠正我们的消息处理程序中的任何错误并重新运行消息.或者,在丢失消息的情况下,直接从事件存储重建构建读取模型.

得到了一些问题,尤其是关于他如何说我们总能向活动商店询问缺失的事件.

  1. CQRS的写入方是否必须为读取方提供服务以"请求"重放事件?例如,如果没有收到事件1,但是有2,4,3,我们可以通过服务要求eventstore从1开始重新发布事件吗?
  2. 此服务是CQRS写入方面的责任吗?
  3. 我们如何使用它重新构建读取模型?

Ill*_*llI 5

如果您有序列号,则可以检测当前事件无序的情况,例如 currentEventNumber != lastReceivedEventNumber + 1

一旦检测到这一点,您只需抛出异常即可。如果您的订阅者有“重试”机制,它将尝试在一秒钟左右再次处理此事件。在此期间,很可能会处理较早的事件并且顺序将是正确的。如果乱序事件很少发生,这是一个解决方案。

如果您经常遇到这种情况,则需要实现全局锁定机制,这将允许按顺序处理某些事件。例如,我们在 MSSQL 中使用 sp_getapplock 在某些情况下实现全局“临界区”行为。当分布式应用程序的多个部分需要的不仅仅是简单的锁时,Apache ZooKeeper 提供了一个框架来处理更复杂的场景。

  • 这个想法是在接收消息时使用[手动确认模式](http://prntscr.com/da8sar)。如果您的序列号无效 - 只需抛出异常,这意味着不会发生确认,这意味着消息将被重新传递。您可以通过在客户端实现中捕获-睡眠-重新抛出未处理的异常来解决延迟问题。这就是 dotnet 中的[确认发生](http://prntscr.com/da8un3)。屏幕截图来自[本教程页面](https://www.rabbitmq.com/dotnet-api-guide.html) (2认同)