Enr*_*one 2 masstransit servicebus rabbitmq cqrs event-sourcing
I'm new to Mass Transit and I would like to understand if it can helps with my scenario. I'm building a sample application implemented with a CQRS event sourcing architecture and I need a service bus in order to dispatch the events created by the command stack to the query stack denormalizers.
Let's suppose of having a single aggregate in our domain, let's call it Photo, and two different domain events: PhotoUploaded and PhotoArchived.
Given this scenario, we have two different message types and the default Mass Transit behaviour is creating two different RabbitMq exchanges: one for the PhotoUploaded message type and the other for the PhotoArchived message type.
Let's suppose of having a single denormalizer called PhotoDenormalizer: this service will be a consumer of both message types, because the photo read model must be updated whenever a photo is uploaded or archived.
Given the default Mass Transit topology, there will be two different exchanges so the message processing order cannot be guaranteed between events of different types: the only guarantee that we have is that all the events of the same type will be processed in order, but we cannot guarantee the processing order between events of different type (notice that, given the events semantic of my example, the processing order matters).
How can I handle such a scenario ? Is Mass Transit suitable with my needs ? Am I completely missing the point with domain events dispatching ?
免责声明:这不是您问题的答案,而是一个预防性消息,为什么您不应该执行计划要做的事情。
尽管RMQ之类的消息代理和MassTransit之类的消息中间件库非常适合集成,但我强烈建议您不要将消息代理用于事件源。我可以参考我的旧答案Event-sourcing:何时(不应该)使用Message Queue?这解释了背后的原因。
您发现自己的原因之一-事件顺序将永远无法保证。
另一个明显的原因是,根据通过消息代理发布的事件来构建读取模型,这有效地消除了重播的可能性,并消除了需要从头开始处理事件的新读取模型,但是它们得到的仅仅是正在出版现在。
聚合形成事务边界,因此每个命令都需要保证它在一个事务中完成。虽然MT支持事务中间件,但它仅保证您会获得支持它们的依赖项的事务,而不能保证context.Publish(@event)
在使用者体内获得依赖,因为RMQ不支持事务。您很有可能提交更改,而不会在读取端获取事件。因此,事件存储的经验法则是,您应该能够从存储中订阅更改流,而不是从代码中发布事件,除非这些事件是集成事件而不是域事件。
对于事件源,至关重要的是,每个读取模型在其计划的事件流中保留自己的检查点。消息代理没有提供这种功能,因为“检查点”实际上是您的队列,并且一旦消息从队列中消失了-它永远消失了,就再也没有回来。
关于实际问题:
您可以使用消息拓扑配置为不同的消息设置相同的实体名称,然后将它们发布到同一交换机,但这属于Chris在该页面上写的“滥用”类别。我没有尝试过,但是您绝对可以尝试。消息CLR类型是元数据的一部分,因此不应该存在反序列化问题。
但是同样,将消息放入同一交换不会为您提供任何订购保证,除非所有消息都将排入使用服务的队列中。
您至少必须根据您的聚合ID设置分区过滤器,以防止并行处理同一聚合的多个消息。顺便说一下,这对于集成也很有用。我们就是这样做的:
void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
=> ep.Handler<T>(
c => appService.Handle(c, aggregateStore),
hc => hc.UsePartitioner(8, partition));
AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
335 次 |
最近记录: |