我一直在试验JOliver的Event Store 3.0作为项目的潜在组件,并一直试图通过Event Store来衡量事件的吞吐量.
我开始使用一个简单的线束,它基本上通过for循环迭代创建一个新流,并将一个包含GUID id和字符串属性的非常简单的事件提交给MSSQL2K8 R2 DB.调度员本质上是一个无操作员.
这种方法设法在8路HP G6 DL380上运行~3K操作/秒,数据库在单独的32路G7 DL580上运行.测试机器没有资源限制,阻塞看起来是我的限制.
有没有人有过测量Event Store吞吐量的经验以及取得了哪些数据?我希望能够获得至少1个数量级的吞吐量,以使其成为可行的选择.
我已经阅读了Jonathan Oliver关于处理无序事件的好帖子.
http://blog.jonathanoliver.com/cqrs-out-of-sequence-messages-and-read-models/
我们使用的解决方案是将消息出列并将其置于"保持表"中,直到收到具有先前序列的所有消息.当收到所有先前的消息后,我们将所有消息从保留表中取出,并通过适当的处理程序依次运行它们.一旦所有处理程序都成功执行,我们将从保留表中删除消息并将更新提交给读取的模型.
这适用于我们,因为域发布事件并使用适当的序列号标记它们.如果没有这个,下面的解决方案将变得更加困难 - 如果不是不可能的话.
此解决方案使用关系数据库作为持久性存储机制,但我们没有使用存储引擎的任何关系方面.与此同时,所有这一切都有一个警告.如果消息2,3和4到达但消息1从未到达,则我们不会应用其中任何消息.只有在出现错误处理消息1或消息1以某种方式丢失时,才会出现这种情况.幸运的是,很容易纠正我们的消息处理程序中的任何错误并重新运行消息.或者,在丢失消息的情况下,直接从事件存储重建构建读取模型.
得到了一些问题,尤其是关于他如何说我们总能向活动商店询问缺失的事件.
我有以下架构:
OFC.有端口和适配器,以及你能想象到的一切......
您有什么建议,如何通过即时一致性发送休息响应?我应该添加另一个事件总线并举办活动吗?(我猜投影必须发一些关于成功的事情.)
如何在这样的基于事件的系统中处理错误?(事件总线不是必需的,我可以解决与IoC容器的松散耦合,但我不认为通过这么多对象发送回调将是一个很好的解决方案.)
在joliver/EventStore中使用MongoDB persistance引擎导致错误Unknown discriminator value 'MyEvent'.问题只是在我尝试加载所有事件以重放事件时引起的this.storeEvent.Advanced.GetFrom(new DateTime(2010, 1,1))
这些问题是在ExtensionsMethods.cs中引起的
public class MyClassEvent : IDomainEvent { ... }
public static Commit ToCommit(this BsonDocument doc, IDocumentSerializer serializer)
{
if (doc == null)
return null;
var id = doc["_id"].AsBsonDocument;
var streamId = id["StreamId"].AsGuid;
var commitSequence = id["CommitSequence"].AsInt32;
var events = doc["Events"].AsBsonArray.Select(e => e.AsBsonDocument["Payload"].IsBsonDocument ? BsonSerializer.Deserialize<EventMessage>(e.AsBsonDocument["Payload"].AsBsonDocument) : serializer.Deserialize<EventMessage>(e.AsBsonDocument["Payload"].AsByteArray)).ToList();
var streamRevision = doc["Events"].AsBsonArray.Last().AsBsonDocument["StreamRevision"].AsInt32;
return new Commit(
streamId,
streamRevision,
doc["CommitId"].AsGuid,
commitSequence,
doc["CommitStamp"].AsDateTime,
BsonSerializer.Deserialize<Dictionary<string, object>>(doc["Headers"].AsBsonDocument),
events);
}
Run Code Online (Sandbox Code Playgroud)
我的配置是这样的:
Wireup.Init()
.UsingMongoPersistence(connectionName, new DocumentObjectSerializer())
.UsingBsonSerialization() …Run Code Online (Sandbox Code Playgroud) 我正在使用Java和Cassandra的事件采购从头开始构建项目.我的应用程序基于微服务,在某些用例中,信息将以异步方式处理.我想知道Message Queue(例如Rabbit,Active MQ Artemis,Kafka等)将在这个环境中改进技术堆栈的哪个部分,如果我理解这些场景,我将不会使用它.
我正在寻找使用JOlivers CommonDomain和EventStore测试域的好例子
我一直在看格雷格年轻人的视频,他有一个很好的简单的抽象聚合根测试夹具.
是否有类似于这些库可以使用的东西?
据推测,我们可以通过应用相同的命令集来恢复状态,那么为什么不简单地存储命令而不是事件呢?
我正在使用DDD与CQRS和事件采购.我需要在我的自定义实现中使用事件存储(特别是此事件存储)IEventStore来持久化并检索域事件,但我遇到了处理序列化/反序列化的方法时遇到困难.
这是我正在实现的接口:
public interface IEventStore
{
Task<IEnumerable<IDomainEvent>> GetEventsAsync(Identity aggregateIdentity, Type aggregateType);
Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}
Run Code Online (Sandbox Code Playgroud)
在我的实现之外,IEventStore我可以将每个映射器都包含IDomainEvent在一些可序列化/可反序列化的EventDto或json字符串中.那不是问题.但这些是我的限制:
我的域事件是实现的不可变对象IDomainEvent(即:没有setter)
我的域事件并不总是以通用方式容易地序列化/反序列化.它们通常具有抽象或接口属性,因此我的域事件和一些可序列化对象(如字符串json或事件DTO)之间的具体映射器在我的IEventStore实现之外决定.
我的IEventStore实现需要在一个通用的方式是,如果我添加新的域的事件类型,我应该不需要碰任何东西的范围内IEventStore实施
我的IEventStore实现可以接收注入的一些特定实现IMapper<TSource, TDestination>,以便我可以使用它们在特定类型(而不是接口)之间进行序列化/反序列化.
public interface IMapper<in TSource, out TDestination>
{
TDestination Map(TSource source); // I have implementations of this if needed
}
Run Code Online (Sandbox Code Playgroud)以下是我的尝试:
public class MyEventStore
: IEventStore
{
private readonly IStreamNameFactory _streamNameFactory;
private readonly …Run Code Online (Sandbox Code Playgroud) 存在实现CQRS +事件源架构的各种示例应用和框架,并且大多数描述使用事件处理器来从存储在事件存储中的域事件创建非规范化视图.
托管此体系结构的一个示例是作为web api,它接受写入端的命令并支持查询非规范化视图.此Web api可能会扩展到负载平衡服务器场中的许多计算机.
我的问题是读取模型事件处理程序托管在哪里?
可能的情况:
在单独的主机上托管在单一的Windows服务中. 如果是这样,那不会造成单点故障吗?这可能使部署复杂化,但它确实保证了单个执行线程.缺点是读取模型可能会出现延迟增加的情况.
作为web api本身的一部分托管. 如果我正在使用EventStore,例如,对于事件存储和事件订阅处理,将为每个单个事件触发多个处理程序(每个Web场进程中一个),从而在处理程序尝试读/写时引起争用到他们的读店?或者,对于给定的聚合实例,我们是否保证在事件版本顺序中一次处理一个所有事件?
我倾向于方案2,因为它简化了部署并且还支持需要也监听事件的流程管理器.虽然只有一个事件处理程序应该处理单个事件,但情况相同.
EventStore可以处理这种情况吗?其他人如何在最终一致的架构中处理事件处理?
编辑:
为了澄清,我在谈论将事件数据提取到非规范化表中的过程,而不是在CQRS中读取那些表中的"Q".
我想我正在寻找的是我们如何"应该"实现和部署可以支持冗余和扩展的读取模型/ sagas/etc的事件处理的选项,当然假设事件处理是以幂等方式处理的.
我已经阅读了两个可能的解决方案,用于处理在事件存储中保存为事件的数据,但我不明白哪个应该用于另一个.
活动巴士
事件总线/队列用于在保存事件后通常由存储库实现发布消息.感兴趣的各方(订阅者),例如阅读模型或传奇/流程管理者,以某种方式使用总线/队列来以幂等方式处理它.
如果队列是pub/sub,这意味着每个下游依赖项(读取模型,sagas等)每个只能支持一个进程来订阅队列.不止一个流程意味着每个流程处理相同的事件,然后竞争以使下游变更.幂等处理应该处理一致性/并发性问题.
如果队列是竞争消费者,我们至少可以在每个Web场节点中托管订户以实现冗余.虽然这需要每个下游依赖的队列; 一个用于sagas /流程管理器,一个用于每个读取模型等,因此存储库必须发布到每个以获得最终的一致性.
认购/饲料
订阅/馈送,其中感兴趣的各方(订户)按需读取事件流并从已知检查点获取事件以便处理成读取模型.
如果需要,这对于重新创建读取模型非常有用.但是,按照通常的发布/订阅模式,似乎每个下游依赖项只应使用一个订阅者进程.如果我们为同一事件流注册多个订户,例如每个Web场节点中有一个订户,则他们都将尝试处理和更新相同的相应读取模型.