我目前正在以下列格式存储事件mycategory-mytype-uniqueid.在阅读了网上各种帖子后我所理解的,我应该得到一个名为" mycategory做" 的类别.我已经写了 :
fromCategory('mycategory')
.foreachStream()
.when({
$init: function(){
return {number: 0};
},
$any: function(state, ev){
linkTo('mynewstream', ev);
return {number: state.number};
}
});
Run Code Online (Sandbox Code Playgroud)
我现在希望得到一个流mynewstream以及一个变量的结果number,但我没有得到.那我错过了什么?
我目前每个聚合根和两个聚合根有一个事件流,Room并且RoomType.
a的行为Room取决于它的含义RoomType.为了分离两个聚合,The在聚合RoomType中仅表示为roomTypeId Room.更改RoomType由RoomTypeChanged事件表示.
该RoomTypes可单独管理,并需要在不同的骨料.
现在考虑以下用例:
当用户使a无效时RoomType,所有Rooms具有该功能的Roomtype应该切换到回退RoomType.
我想到了几种方法,但所有方法似乎都有问题:
让一个事件监听器监听一个RoomTypeInvalidated-event,然后发送一个有SwitchToFallbackRoomType所有的Room-aggregates Roomtype.我怎么会这样做?Roomtype除非我访问我的readmodel,这似乎是不正确的,否则无法知道哪些聚合有.即使我要加载所有聚合,也无法仅加载该类型的聚合,因为我无法加载所有流的子集(使用geteventstore).
当重新将RoomTypeChanged-events应用于Room聚合时,而不仅仅是应用它,检查是否RoomType仍然存在,但是再次,我怎么知道哪个RoomTypes存在(我和1的情况相同,但是反转) ?此外,在重新应用事件时加入逻辑似乎是错误的,它们应该只代表我认为的状态变化.
你怎么解决这个问题?
我编写了一个Event Sourced Aggregate,现在实现了一个Event Sourced Saga ......我注意到这两个是similair并创建了一个事件源对象作为基类,两者都是从这个派生出来的.
我在http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/看到了一个演示,但觉得可能存在一个问题,因为命令可能会丢失由于发送命令在写入事务之外,进程崩溃了吗?
public void Save(ISaga saga)
{
var events = saga.GetUncommittedEvents();
eventStore.Write(new UncommittedEventStream
{
Id = saga.Id,
Type = saga.GetType(),
Events = events,
ExpectedVersion = saga.Version - events.Count
});
foreach (var message in saga.GetUndispatchedMessages())
bus.Send(message); // can be done in different ways
saga.ClearUncommittedEvents();
saga.ClearUndispatchedMessages();
}
Run Code Online (Sandbox Code Playgroud)
相反,我正在使用Greg Young的EventStore,当我保存EventSourcedObject(聚合或传奇)时,序列如下:
我正在实现一个传奇的两个方面:
问题
据我所知,事件处理程序不应该发出命令(如果命令失败会发生什么?) - 但我对上面的确定,因为Saga是通过此事件代理控制命令创建(对事件做出反应)的实际操作,并且任何命令发送失败都可以在外部处理(在外部EventHandler处理,CommandEmittedFromSaga如果命令失败则重新发送)? …
背景
我们正在尝试在我们公司引入一种新的架构模式,并正在考虑使用服务总线进行事件采购的CQRS.我们目前正在开发的POC技术有NServiceBus,Event Store和MSMQ.我们希望在NServiceBus中使用两个不同的传输定义一个端点,我们的命令使用MSMQ,事件使用Event Store.我们企业的当前状态不允许我们现在轻松地将所有内容切换到Event Store,因为我们使用MSMQ对我们的遗留应用程序进行了大量投资,这是我们考虑混合方法的原因.
题
是否可以创建使用不同传输的单个NServiceBus端点?如果有,怎么样?如果不是,有哪些替代方案?
尽我所能,我无法在Windows Azure上访问我的Event Store安装.我遵循了wiki中的确切建议.是.我已经在Azure中正确设置了端口.但是,我无法从外部机器连接到它.
在一台单独的机器上,如果我卷曲{{mydomain}}.cloudapp.net,它会超时,但在Azure VM上,如果我卷曲localhost,我会得到Moved(重定向到完整的管理员)
似乎没有关于如何使用EventStore.Client.Embedded nuget包设置,运行和连接到嵌入式EventStore客户端的文档.使用以下代码:
ClusterVNode node = EmbeddedVNodeBuilder
.AsSingleNode()
.RunInMemory()
.OnDefaultEndpoints()
.Build();
var connection = EmbeddedEventStoreConnection.Create(node);
await connection.ConnectAsync();
var sampleEventData = new EventData(Guid.NewGuid(), "myTestEvent", false, new byte[] { 6, 10, 15 }, null);
WriteResult writeResult = await connection.AppendToStreamAsync("sampleStream, ExpectedVersion.NoStream, sampleEventData);
Run Code Online (Sandbox Code Playgroud)
一切似乎工作正常,直到该AppendToStreamAsync行,抛出以下异常:
EventStore.Core.Messages.ClientMessage + WriteEventsCompleted的预期响应,接收到EventStore.Core.Messages.ClientMessage + NotHandled.
为了解决这个异常,缺少哪些神奇的咒语?
我重新编写了“事件存储入门”项目,以了解正在发生的事情,现在在测试中,CanSaveExistingAggregate()我收到了WrongExpectedVersionException。问题是,为了尝试弄清楚正在发生的事情,我想知道预期的版本应该是什么,如何找到呢?在测试中,该行将repository.Save(firstSaved, Guid.NewGuid(), d => { });预期的版本计算为101,这是失败的地方:
[Test]
public void CanSaveExistingAggregate()
{
var savedId = SaveTestAggregateWithoutCustomHeaders(repository, 100 /* excludes TestAggregateCreated */);
var firstSaved = repository.GetById<TestAggregate>(savedId);
Console.WriteLine("version:" + firstSaved.Id);
firstSaved.ProduceEvents(50);
repository.Save(firstSaved, Guid.NewGuid(), d => { });
var secondSaved = repository.GetById<TestAggregate>(savedId);
Assert.AreEqual(150, secondSaved.AppliedEventCount);
}
Run Code Online (Sandbox Code Playgroud)
以及引发异常的代码:
public void Save(CommonDomain.IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
{
var commitHeaders = new Dictionary<string, object>
{
{CommitIdHeader, commitId},
{AggregateClrTypeHeader, aggregate.GetType().AssemblyQualifiedName}
};
updateHeaders(commitHeaders);
var streamName = aggregateIdToStreamName(aggregate.GetType(), aggregate.Id);
var newEvents = aggregate.GetUncommittedEvents().Cast<object>().ToList(); …Run Code Online (Sandbox Code Playgroud) 如果事件/消息的生产者和消费者都是基于.Net/C#的,我倾向于在有效载荷中使用元数据,以便能够将数据反序列化为C#POCO,如下所示:
Data
{
"X": {
"a": "bb811ea5-6993-e511-80fc-1458d043a750",
"b": "ddd",
"b": "dddd",
"d": true
}
"x1": 1.1234,
"x2": 2.3456,
"EventUtcDateTime": "2016-02-16T08:55:38.5103574Z"
}
Metadata
{
"TimeStamp": "02/16/2016 08:55:37",
"EventClrTypeName": "Bla.Di.Bla.SomeClass, Bla.Di.Bla, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"
}
Run Code Online (Sandbox Code Playgroud)
在生产者不是基于.Net/C#的情况下,什么是一个好的解决方案?
我正在开发一个简单的DDD +基于事件采购的应用程序,用于教育目的.
为了在存储到事件存储之前设置事件版本,我应该查询事件存储,但我的直觉告诉这是错误的,因为它会导致并发问题.
我错过了什么吗?
domain-driven-design event-stream cqrs event-sourcing get-event-store