我想知道使用EventStore有什么好处(http://geteventstore.com)比在MongoDb中自己实现事件采购.
我问的原因是,我们公司有很多人每天与MongoDb合作.但它们不适用于Event Sourcing.虽然他们并没有完全关注这个主题,但他们也不打算在任何地方开始实施.
我即将开始一个非常适合Event Sourcing的项目.大约有16个定义明确的事件,大约有7个定义明确的预测.我说"关于"因为我知道一旦他们看到产品在使用中就会有更多的预测和事件需求.
这种方法将首先是API,我们组织的其他部分将使用REST Api.
虽然我已经按照Greg Young定义的方式阅读了很多关于事件采购的内容,但我从未实际实施过Event Sourcing解决方案.
这是一个绿色的田野项目.没有技术限制,因为我们将把所有内容公开为REST接口.因此,如果有人有使用MongoDb的EvenStore或Event Sourcing的工作经验,请赐教.
关于事件采购的几乎完全不相关的问题:您是否曾直接查询事件存储?或者您是否总是创建新的预测和重播事件以填充这些预测?
最近我正在学习CQRS,并希望改变我的系统以使用事件采购模式.
但我发现在.Net平台上,有两个Event Store实现.
这两个实现让我很困惑 - 有人可以解释一下它们之间的主要区别.
我正在认真考虑使用Greg Young的EvenStore来实现基于事件的系统.但是,我不确定Greg Young的EventStore是否支持Snapshots.快照是我的应用程序的关键要求,因为我们不希望每次都重放所有事件来构造对象状态.
有人可以向我提供有关如何在EventStore中创建快照的示例(Greg Young Version)吗?
问题:
两名员工(A&B)同时离线,同时编辑客户#123,比如版本#20,当离线继续进行更改时......
场景:
1 - 两名员工编辑客户#123并对一个或多个相同的属性进行更改.
2 - 两名员工编辑客户#123但不要做出相同的更改(他们互相交叉而不接触).
......然后他们都回到网上,第一名员工A追加,从而将客户更改为版本#21,然后将员工B更改为版本#20
问题:
我们在方案1中保留了哪些变化?
我们可以在方案2中进行合并吗?
语境:
1 - CQRS +事件采购风格系统
2 - 使用事件源Db作为队列
3 - 读模型的最终一致性
4 - RESTful API

编辑-1:到目前为止基于答案的澄清:
为了执行精细的粒度合并,我需要为一个表单中的每个字段设置一个命令?

在上面,ChangeName,ChangeSupplier,ChangeDescription等的细粒度命令,每个都有自己的时间戳,允许自动合并事件A和B都更新ChangedName?
编辑-2:根据特定事件存储的使用进行跟进:
好像我会利用@GetEventStore来保持我的事件流的持久性.
他们使用乐观并发如下:
流中的每个事件都将流版本增加1
写入可以指定预期版本,在编写器上使用ES-ExpectedVersion标头
-1指定流不应该已存在
0及以上指定流版本
如果流不在版本中,写入将失败,您要么使用新的预期版本号重试,要么重新处理该行为并确定如果您选择它就行.
如果未指定ES预期版本,则禁用乐观并发控制
在这种情况下,Optimistic Concurrency不仅基于消息ID,还基于事件#
domain-driven-design occasionallyconnected cqrs event-sourcing get-event-store
我正在编写一个简单的事件调度程序,其中我的事件作为具有clr类型名称的对象和表示原始事件的json对象(在将byte []处理到jobject之后)被触发.如果有人想知道具体细节,我正在使用GetEventStore.
我想把那个clr类型做两件事:
我已经设法使用以下代码使第1部分正常工作:
var processedEvent = ProcessRawEvent(@event);
var t = Type.GetType(processedEvent.EventClrTypeName);
var type = typeof(IHandlesEvent<>).MakeGenericType(t);
var allHandlers = container.ResolveAll(type);
foreach (var allHandler in allHandlers)
{
var method = allHandler.GetType().GetMethod("Consume", new[] { t });
method.Invoke(allHandler, new[] { processedEvent.Data });
}
Run Code Online (Sandbox Code Playgroud)
ATM的问题是processedEvent.Data是一个JObject - 我知道processedEvent.Data的类型,因为我已经在它上面定义了它.
如何在不对类型名称进行任何讨厌的切换的情况下将该JObject解析为类型t?
我已经开始在.Net中创建一个测试应用程序,它使用Greg Young的EventStore作为CQRS/ES的后备存储.
为了便于加载完整聚合,我保存到名为"agg-123"的流.例如,对于id为553的产品聚合,会有一个名为"product-553"的流.然后,对于"Order"聚合,该流将被命名为"order-123".
从补充和保存事件来看,这很有效.
我现在正在尝试创建一个侦听器,它将侦听某些流,然后填充查询数据库.我看到的订阅方法似乎只能订阅"order-123"或"all".我看不出如何订阅"product-"或"order-",或两者兼而有之.
我想也是
有人建议吗?
几天以来,我一直试图弄清楚如何告知其余的微服务,在微服务A中创建一个新实体,将该实体存储在MongoDB中.
我想要:
微服务之间的耦合很低
避免像两阶段提交(2PC)这样的微服务之间的分布式事务
起初像RabbitMQ这样的消息代理似乎是一个很好的工具,但后来我看到了在MongoDB 中提交新文档并在代理中发布消息而不是原子的问题.
为何选择活动?by eventuate.io:

解决此问题的一种方法意味着通过添加一个标记来说明文档的模式有点脏,该标记表明文档是否已在代理中发布,并且具有在MongoDB中搜索未发布文档的预定后台进程,并使用这些文档将这些文档发布到代理确认,当确认到达时,文档将被标记为已发布(使用at-least-once和idempotency语义).这个解决方案在此提出并且这个答案.
阅读Chris Richardson 的微服务简介我最后在这个关于开发功能域模型的伟大演示中,其中一个幻灯片询问:
如何在没有2PC的情况下自动更新数据库并发布事件和发布事件?(双写问题).
答案很简单(在下一张幻灯片中)
更新数据库并发布事件
这是一种不同的方式来这一个是基于CQRS一拉Greg Young的.
域存储库负责发布事件,这通常在单个事务内部,同时将事件存储在事件存储中.
我认为委托将事件存储和发布到事件存储的责任是一件好事,因为避免了2PC或后台进程的需要.
然而,以某种方式它的真实是:
如果您依赖事件存储来发布事件,那么您将与存储机制紧密耦合.
但是,如果我们采用消息代理来进行微服务的交互,我们可以说同样的话.
令我担心的是,事件存储似乎成为单点故障.
如果我们从eventuate.io看这个例子

我们可以看到,如果事件存储已关闭,我们无法创建帐户或资金转移,失去了微服务的优势之一.(虽然系统将继续响应查询).
因此,确认eventuate示例中使用的事件存储是单点故障是正确的吗?
我最近开始尝试使用.net-core(nuget包)的EventStore客户端API .但是,我正在努力让事件写入流中.下面是我用来建立连接的代码:
private readonly string _eventStoreName = "localhost";
private readonly string _eventStorePort = "1113";
protected IEventStoreConnection Connection;
public EventStoreTestFixture()
{
var eventStoreIpAddress = Dns.GetHostAddressesAsync(_eventStoreName).Result;
var ipAddress = GetIpAddressFromHost(eventStoreIpAddress);
var connectionSettings = ConnectionSettings.Create()
.SetDefaultUserCredentials(new UserCredentials("admin", "changeit"))
.EnableVerboseLogging();
Connection = EventStoreConnection.Create(connectionSettings,new IPEndPoint(ipAddress, int.Parse(_eventStorePort)));
Connection.ConnectAsync().Wait();
}
private static IPAddress GetIpAddressFromHost(IPAddress[] eventStoreIpAddress)
{
return
eventStoreIpAddress.FirstOrDefault(
ipAddress => ipAddress.AddressFamily.Equals(AddressFamily.InterNetwork));
}
Run Code Online (Sandbox Code Playgroud)
这是我试图写入EventStream的代码:
public class EmployeeEventSourcedRepository<T> where T : class, IEvenSourced
{
private readonly IEventStoreConnection _connection;
public EmployeeEventSourcedRepository(IEventStoreConnection connection)
{
_connection = connection; …Run Code Online (Sandbox Code Playgroud) 当我尝试发布没有eventId的事件时,我得到:
提供HTTP/1.1 400空eventId.
如果我用随机的东西填充eventId,那么我得到:
HTTP/1.1 400写请求正文无效
在服务器的标准输出中,我看到:
将值"foo"转换为"System.Guid"类型时出错
获得有效的EventId需要什么?
命令:
curl -i -d @event.json localhost:2113/streams/birthday-offer \
-H "Content-Type:application/vnd.eventstore.events+json"
Run Code Online (Sandbox Code Playgroud)
event.json:
[
{
"eventId": "foo",
"eventType": "bar",
"data": {
"who": "11111111111",
"which": "birthday-offer"
}
}
]
Run Code Online (Sandbox Code Playgroud)
我不是.NET语言.
存在实现CQRS +事件源架构的各种示例应用和框架,并且大多数描述使用事件处理器来从存储在事件存储中的域事件创建非规范化视图.
托管此体系结构的一个示例是作为web api,它接受写入端的命令并支持查询非规范化视图.此Web api可能会扩展到负载平衡服务器场中的许多计算机.
我的问题是读取模型事件处理程序托管在哪里?
可能的情况:
在单独的主机上托管在单一的Windows服务中. 如果是这样,那不会造成单点故障吗?这可能使部署复杂化,但它确实保证了单个执行线程.缺点是读取模型可能会出现延迟增加的情况.
作为web api本身的一部分托管. 如果我正在使用EventStore,例如,对于事件存储和事件订阅处理,将为每个单个事件触发多个处理程序(每个Web场进程中一个),从而在处理程序尝试读/写时引起争用到他们的读店?或者,对于给定的聚合实例,我们是否保证在事件版本顺序中一次处理一个所有事件?
我倾向于方案2,因为它简化了部署并且还支持需要也监听事件的流程管理器.虽然只有一个事件处理程序应该处理单个事件,但情况相同.
EventStore可以处理这种情况吗?其他人如何在最终一致的架构中处理事件处理?
编辑:
为了澄清,我在谈论将事件数据提取到非规范化表中的过程,而不是在CQRS中读取那些表中的"Q".
我想我正在寻找的是我们如何"应该"实现和部署可以支持冗余和扩展的读取模型/ sagas/etc的事件处理的选项,当然假设事件处理是以幂等方式处理的.
我已经阅读了两个可能的解决方案,用于处理在事件存储中保存为事件的数据,但我不明白哪个应该用于另一个.
活动巴士
事件总线/队列用于在保存事件后通常由存储库实现发布消息.感兴趣的各方(订阅者),例如阅读模型或传奇/流程管理者,以某种方式使用总线/队列来以幂等方式处理它.
如果队列是pub/sub,这意味着每个下游依赖项(读取模型,sagas等)每个只能支持一个进程来订阅队列.不止一个流程意味着每个流程处理相同的事件,然后竞争以使下游变更.幂等处理应该处理一致性/并发性问题.
如果队列是竞争消费者,我们至少可以在每个Web场节点中托管订户以实现冗余.虽然这需要每个下游依赖的队列; 一个用于sagas /流程管理器,一个用于每个读取模型等,因此存储库必须发布到每个以获得最终的一致性.
认购/饲料
订阅/馈送,其中感兴趣的各方(订户)按需读取事件流并从已知检查点获取事件以便处理成读取模型.
如果需要,这对于重新创建读取模型非常有用.但是,按照通常的发布/订阅模式,似乎每个下游依赖项只应使用一个订阅者进程.如果我们为同一事件流注册多个订户,例如每个Web场节点中有一个订户,则他们都将尝试处理和更新相同的相应读取模型.