在研究了大量材料和示例之后,我目前正在开始第一次尝试DDD/CQRS/ES系统.
1)我已经看到了事件源代码示例,其中Aggregates是事件处理程序,并且每个事件的Handle方法是改变对象实例上的状态(它们为将改变状态的事件实现IHandleEvent <EventType>接口)
2)我还看到了一些例子,其中Aggregates看起来就像是对域建模的普通经典实体类.另一个事件处理程序类涉及改变状态.
当然,当从存储库调用重建聚合时,状态由事件处理程序在聚合上进行突变,该存储库调用获取该聚合的所有先前事件,并且当命令处理程序调用聚合上的方法时.虽然在后者中我看过事件在命令处理程序而不是聚合中发布的例子,但我确信这是错误的.
我的问题是方法(1)和(2)之间的利弊是什么
我已经开始在.Net中创建一个测试应用程序,它使用Greg Young的EventStore作为CQRS/ES的后备存储.
为了便于加载完整聚合,我保存到名为"agg-123"的流.例如,对于id为553的产品聚合,会有一个名为"product-553"的流.然后,对于"Order"聚合,该流将被命名为"order-123".
从补充和保存事件来看,这很有效.
我现在正在尝试创建一个侦听器,它将侦听某些流,然后填充查询数据库.我看到的订阅方法似乎只能订阅"order-123"或"all".我看不出如何订阅"product-"或"order-",或两者兼而有之.
我想也是
有人建议吗?
我已经阅读了Jonathan Oliver关于处理无序事件的好帖子.
http://blog.jonathanoliver.com/cqrs-out-of-sequence-messages-and-read-models/
我们使用的解决方案是将消息出列并将其置于"保持表"中,直到收到具有先前序列的所有消息.当收到所有先前的消息后,我们将所有消息从保留表中取出,并通过适当的处理程序依次运行它们.一旦所有处理程序都成功执行,我们将从保留表中删除消息并将更新提交给读取的模型.
这适用于我们,因为域发布事件并使用适当的序列号标记它们.如果没有这个,下面的解决方案将变得更加困难 - 如果不是不可能的话.
此解决方案使用关系数据库作为持久性存储机制,但我们没有使用存储引擎的任何关系方面.与此同时,所有这一切都有一个警告.如果消息2,3和4到达但消息1从未到达,则我们不会应用其中任何消息.只有在出现错误处理消息1或消息1以某种方式丢失时,才会出现这种情况.幸运的是,很容易纠正我们的消息处理程序中的任何错误并重新运行消息.或者,在丢失消息的情况下,直接从事件存储重建构建读取模型.
得到了一些问题,尤其是关于他如何说我们总能向活动商店询问缺失的事件.
我在服务器和多个客户端之间实现数据同步时遇到问题.我读到了关于事件采购的内容,我想用它来完成同步部分.
我知道这不是一个技术问题,而是一个概念问题.
我只是将所有事件发送到服务器,但客户端设计为不时脱机使用.
该服务器存储每个客户端都应该知道,它不会重播这些事件来服务于数据,因为主要目的是同步的客户端之间的事件,使他们能够在本地重播所有事件的所有事件.
该客户有它一个JSON店,也使所有的事件,并重建从存储/同步事件的所有不同的集合.
由于客户端可以脱机修改数据,因此具有一致的同步周期并不重要.考虑到这一点,服务器应该在合并不同事件时处理冲突,并在发生冲突时询问特定用户.
因此,对我来说主要的问题是确定客户端和服务器之间的差异,以避免将所有事件发送到服务器.我也遇到了同步过程的顺序问题:首先推送更改,首先提取更改?
我目前构建的是服务器端上的默认MongoDB实现,它在所有查询中隔离特定用户组的所有文档(目前仅处理身份验证和服务器端数据库工作).在客户端上,我构建了一个围绕NeDB存储的包装器,使我能够拦截所有查询操作,以创建和管理每个查询的事件,同时保持默认查询行为不变.我还通过实现客户端生成的自定义ID并且是文档数据的一部分来补偿neDB和MongoDB的不同ID系统,因此重新创建数据库不会弄乱ID(同步时,这些ID应该在所有客户中保持一致).
事件格式如下所示:
{
type: 'create/update/remove',
collection: 'CollectionIdentifier',
target: ?ID, //The global custom ID of the document updated
data: {}, //The inserted/updated data
timestamp: '',
creator: //Some way to identify the author of the change
}
Run Code Online (Sandbox Code Playgroud)
为了在客户端上节省一些内存,我将在特定数量的事件中创建快照,以便完全重放所有事件将更有效.
因此,要缩小问题范围:我能够在客户端重放事件,我还能够在客户端和服务器端创建和维护事件,在服务器端合并事件也应该不是问题.使用现有工具复制整个数据库不是一种选择,因为我只是同步数据库的某些部分(甚至不是整个集合,因为文档被分配了它们应该同步的不同组).
但我遇到的麻烦是:
我想问的另一个问题是,将更新直接存储在类似修订版的文档中是否更有效?
如果我的问题不清楚,重复(我发现了一些问题,但他们在我的情况下没有帮助我)或者缺少某些东西,请发表评论,我会尽可能地保持它,以保持简单,就像我一样只写下一切可以帮助你理解这个概念.
提前致谢!
流程经理是否使用关联ID或特定于聚合的标识来跟踪其管理的流程?
为了更清楚地举一个例子,请考虑Saga on Sagas上的图2 :
首先,Process Manager发送OrderConfirmed事件是对的吗?我(作为进程管理器)无法发送事件,只发出命令.还是我错了?
其次,流程管理器如何关联来自不同聚合的OrderCreated,SeatsReserved,PaymentReceived事件?是每个聚合符合(并复制)的相关ID,还是特定标识符(例如SeatsReserved具有引用订单聚合的订单ID)?
最后,如果是相关id的情况,谁创建它们?是发出命令的客户端,比如PlaceOrder(order_id, correlation_id)?是聚合接受命令PlaceOrder(order_id)然后发出OrderCreated(order_id, corr_id)事件吗?或者,是负责这个的流程经理(以某种方式)?或者,相关id可能与此无关?
谢谢你的帮助.
我一直在阅读有关事件溯源模式的信息,如果您想重建系统,它会非常有用。
但是,如果我需要在为新传入请求提供服务时运行事件重建怎么办?该场景是否有任何特定模式或最佳实践?
因此,不是安排系统停机时间,而是如何确保新传入的请求在重播时不会搞砸我的系统,因为事件同步和顺序对我的系统来说非常重要。它涉及更新依赖于事件序列的数据库记录。
有什么想法吗?
就我目前的一点点了解,“微服务”的核心概念之一就是它依赖于自己的数据库,独立于其他微服务。
深入探讨如何在微服务系统中处理分布式事务,最好的策略似乎是事件溯源模式,其核心是事件存储。
事件存储是否在不同的微服务之间共享?或者每个微服务都有多个独立的事件存储数据库和一个公共事件代理?
如果第一个选项是解决方案,使用 CQRS 我现在可以假设每个微服务的数据库都用作查询端,而共享事件存储在命令端。这是一个错误的假设吗?
既然我们在这个主题中:如果使用乐观锁定在 Stream 中进行并发写入,我必须重试多少次?
非常感谢您给我的每一条建议!
distributed-computing distributed-system event-sourcing microservices
我想知道我的两种方法中哪一种更合适,或者还有另一种方法吗?
UI发送HTTP请求到GATEWAYGATEWAY发送HTTP请求到?SERVICE A?SERVICE A返回SUCCESS或ERROREVENT STORE并发布到QUEUEPROJECTION DATABASE 已更新?SERVICES可能消费事件UI发送HTTP请求到GATEWAYGATEWAY 发布事件到 QUEUE?SERVICE A 消费事件EVENT STORE并发布到QUEUEPROJECTION DATABASE 已更新?SERVICES可能消费事件GATEWAY消费事件并将响应(SUCCESS或ERROR)发送到UI如果我误解了一些概念,我真的很抱歉,我对这种建筑风格比较陌生。
在此先感谢您的帮助!:)
什么是事件驱动设计和领域驱动设计?
在微服务中使用领域驱动设计、事件驱动设计有什么具体好处。
情况如下。共有三种服务,一种服务是事件源,并使用事件总线(如 Azure 服务总线或 ActiveMQ)向其他两种服务(订阅者)发布集成或通知事件(发件箱模式)。
此设计的灵感来自.NET 微服务 - 架构电子书 - 订阅事件。
我想知道如果这些事件之一由于错误而无法传递,或者事件处理只是没有正确实现,会发生什么。
c# integration publish-subscribe event-sourcing microservices
event-sourcing ×10
cqrs ×7
.net ×1
c# ×1
event-store ×1
gateway ×1
integration ×1
javascript ×1
mongodb ×1
node.js ×1
spring ×1
spring-boot ×1