我有一个项目,其设计或至少应根据众所周知的DDD原则.
返回 - DDD + CQRS +事件存储
UI - ngrx/store
我有很多问题要问,但现在我会坚持这两个:
a)订阅response.ok
b)听域名活动
c)触发持有创建/更新/删除对象的通用事件?
如何访问EventStore中的所有存储事件以重建我的读取模型?
GetFrom在问题J Oliver EventStore V2.0问题中提到了一个调用的方法,但我在从Wireup返回的接口"IStoreEvents"上找不到这个方法.
使用JOliver EventStore 3.0,开始使用简单的样本.
我有一个使用NServiceBus的简单pub/sub CQRS实现.客户端在总线上发送命令,域服务器接收并处理命令并将事件存储到事件存储库,然后由eventstore的调度程序在总线上发布.然后,读取模型服务器订阅这些事件以更新读取模型.没有什么花哨的,几乎是书本.
它正在工作,但只是在简单的测试中,当事件存储到EventStore时,我在域服务器上获得了大量的并发异常(间歇性).它正确地重试,但有时它会达到5次重试限制,并且命令最终会出现在错误队列中.
我在哪里可以开始调查以查看导致并发异常的原因?我删除了调度程序,只关注存储事件,它有同样的问题.
我正在使用RavenDB来持久化我的EventStore.我没有做任何花哨的事,只是这样:
using (var stream = eventStore.OpenStream(entityId, 0, int.MaxValue))
{
stream.Add(new EventMessage { Body = myEvent });
stream.CommitChanges(Guid.NewGuid());
}
Run Code Online (Sandbox Code Playgroud)
异常的堆栈跟踪如下所示:
2012-03-17 18:34:01,166 [Worker.14] WARN NServiceBus.Unicast.UnicastBus [(null)] <(null)> - EmployeeCommandHandler处理消息失败.EventStore.ConcurrencyException:抛出了类型'EventStore.ConcurrencyException'的异常.at EventStore.OptimisticPipelineHook.PreCommit(提交尝试)在c:\ Code\public\EventStore\src\proj\EventStore.Core\OptimisticPipelineHook.cs:第55行在EventStore.OptimisticEventStore.Commit(提交尝试)在c:\ Code\public\EventStore\src\proj\EventStore.Core\OptimisticEventStore.cs:在C:\ Code\public\EventStore\src\proj\EventStore.Core\OptimisticEventStream.cs中的EventStore.OptimisticEventStream.PersistChanges(Guid commitId)的第90行:第168行在EventStore.OptimisticEventStream.CommitChanges(Guid commitId)的c:\ Code\public\EventStore\src\proj\EventStore.Core\OptimisticEventStream.cs:第149行,在CQRSTest3.Domain.Extensions.StoreEvent(IStoreEvents eventStore,Guid entityId) ,对象evt)在C:\ dev\test\CQRSTest3\CQRSTest3.Domain\Extensions.cs:第13行,位于C:\ dev\test\CQRSTest3\CQRSTest3中的CQRSTest3.Domain.ComandHandlers.EmployeeCommandHandler.Handle(ChangeEmployeeSalary message). Domain\ComandHandlers\Emplo yeeCommandHandler.cs:第55行
基线信息: 我正在使用外部OAuth提供程序进行登录.如果用户登录到外部OAuth,则可以进入我的系统.但是,我的系统中可能尚不存在此用户.这不是一个真正的技术问题,但我正在使用JOliver EventStore来实现它的价值.
逻辑:
问题: 假设在读取模型更新之前以某种方式发出了两个创建命令,这是由于浏览器刷新或在实现与读取模型的一致性之前发生的一些其他异常.没关系,这不是我的问题.
会发生什么: 因为新ID是Guid梳子,事件存储器不可能知道这两个CreateUser命令代表同一个用户.当他们到达读取模型时,读取模型将知道(因为他们具有相同的电子邮件)并且可以合并两个记录或采取一些其他补偿操作.但是现在我的阅读模型与事件存储不同步,事件存储仍然认为这些是两个独立的实体.
也许这无关紧要,因为:
任何人都能说明他们如何处理类似的问题吗?如果需要进行某些补偿操作,那么当读取模型服务意识到它有重复的条目时会发出某种补偿命令吗?有没有更简单的方法我不考虑?
谢谢!
是否有可能不是通过StreamId搜索流,而是通过其他Stream属性搜索?例如,如果每个流在Headers中都有CustomerId,我想搜索具有特定CustomerId的所有流.
不熟悉域驱动设计的所有细节,在微服务架构中将每个服务视为自己的域并依次为每个服务构建一个事件存储是否有意义?
不完全确定整个系统的单个整体事件存储可能会有什么权衡.例如,在系统中重放条件或调试跨服务依赖性更困难.
我有一个使用Jonathan Oliver的CommonDomain和EventStore的小系统.
我如何对我的聚合根进行单元测试以验证是否引发了正确的事件?
考虑遵循聚合根:
public class Subscriber : AggregateBase
{
private Subscriber(Guid id)
{
this.Id = id;
}
private Subscriber(Guid id, string email, DateTimeOffset registeredDate)
: this(id)
{
this.RaiseEvent(new NewSubscriberRegistered(this.Id, email, registeredDate));
}
public string Email{ get; private set; }
public DateTimeOffset RegisteredDate { get; private set; }
public static Subscriber Create(Guid id, string email, DateTimeOffset registeredDate)
{
return new Subscriber(id, email, registeredDate);
}
private void Apply(NewSubscriberRegistered @event)
{
this.Email = @event.Email;
this.RegisteredDate = @event.RegisteredDate;
} …Run Code Online (Sandbox Code Playgroud) 我正在尝试学习EventStore,我喜欢这个概念,但是当我尝试在实践中应用时,我会陷入同样的困境.我们来看看代码:
foreach (var k in stream.CommittedEvents)
{
//handling events
}
Run Code Online (Sandbox Code Playgroud)
两个问题:
当应用程序在某些维护后启动时,我们如何以安全的方式书签哪些事件开始读取?有使用的模式吗?
一旦事件全部被消耗,循环结束......消息到达运行时间怎么样?我希望呼叫阻塞,直到一些新消息到达(当然需要在一个线程中处理)或者有类似BeginRead EndRead的东西.
我是否必须绑定ESB来处理运行时事件,或者EventSore是否提供了一些工具来执行此操作?
我试图用一个例子更好地解释 假设聚合是一个金融投资组合,而应用程序是一个向交易者展示该投资组合的应用程序.假设交易者连接到Web应用程序,他会查看自己的投资组合.当前状态将是整个历史记录,因此我必须阅读大量记录以重现状态.我想这可以通过一个所谓的快照完成,但是谁负责创建它?什么时候应该选择创建聚合?怎么能猜出聚合的快照存在?对于运行时部分:一旦用户查看重建的投资组合状态,实时部分就开始运行.用户可以通过在市场中成功执行该订单来下订单并创建新的头寸.基础设施如何更新投资组合?我希望,但也许我完全错了,让相同的事件流成为新事件新长位置的来源,否则我有两条路径处理同一聚合的状态.我想知道这是策略应该如何工作,即使我觉得有两个州代理有点棘手,可能会重叠.只是为了澄清我如何担心重叠:
我在流媒体事件之前订阅了一个事件总线,以更新投资组合的状态.总线上出现了一些"开仓事件":我必须处理它们,但是由于尚未实现,投资组合可能处于正确状态.即使我能够处理这样的事件,我会在阅读流时再次找到它们.
更阴险:我打开流,我读了所有事件,我创建了一个状态.然后我订阅了总线:总线上的一些消息发生在steram读取结束和订阅开始之间的中间:这些事件丢失且聚合未处于正确状态.
请耐心等待,我的英语很差,争论很棘手,希望我设法分享我的疑问:)
假设我们有两个业务组件
这拥有用户.更改用户信息时,此组件将发布消息.所以例如"NewUserCreated"
处理与用户的通信.电子邮件,推文等.因此,该组件订阅用户消息将该信息的子集存储在其自己的商店中.
如果用户管理组件在出版物组件之前联机,会发生什么?出版物如何获得现有用户列表?它不应该知道用户管理组件如何存储其数据.
我们当前的系统是一个不使用的遗留系统domain events.我们将开始发布domain events.其他有界的背景将会听取这些domain events,但仅限于我们开始发布时,丢失所有过去的信息.
那么,如何处理这个没有记录这些事件的遗留系统,但是我们想要在实现这个事件存储系统之前有过去的历史?
这是一个很好的方法,试图弄清楚发生了什么,并尝试根据我们在数据库中的数据创建域事件(逆向工程)?
event-store ×10
cqrs ×7
c# ×2
nservicebus ×2
.net ×1
commondomain ×1
eventsource ×1
messaging ×1
unit-testing ×1