Dav*_*rdi 3 .net concurrency event-sourcing neventstore
我是NEventStore的新手和一般的事件采购.在一个项目中,我想使用NEventStore来持久化聚合生成的事件,但是我有一些问题需要正确处理并发.
如何使用乐观锁写入同一个流?
假设我有2个不同线程在修订版1中加载的相同聚合的2个实例.然后是第一个线程调用命令A和第二个线程调用命令B. 使用乐观锁定其中一个聚合应该失败并出现并发异常.
我想使用maxRevision从加载聚合的点打开流,但似乎CommitChanges永远不会失败,如果我传递旧版本.
我错过了什么?使用NEventStore/Event Sourcing时,乐观锁定可能/正确吗?
这是我用来重现问题的代码:
namespace NEventStore.Example
{
using System;
using System.Transactions;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Persistence.SqlPersistence.SqlDialects;
internal static class MainProgram
{
private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
private static IStoreEvents store;
private static void Main()
{
using (var scope = new TransactionScope())
using (store = WireupEventStore())
{
Client1(revision: 0);
Client2(revision: 0);
scope.Complete();
}
Console.WriteLine(Resources.PressAnyKey);
Console.ReadKey();
}
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.UsingInMemoryPersistence()
.Build();
}
private static void Client1(int revision)
{
using (var stream = store.OpenStream(StreamId, 0, revision))
{
var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void Client2(int revision)
{
using (var stream = store.OpenStream(StreamId, 0, revision))
{
var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
我希望客户端2失败,因为我使用旧版本打开流.
更新26/08/2013:我使用Sql server测试了相同的代码,似乎按预期工作.
namespace NEventStore.Example
{
using System;
using System.Transactions;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Persistence.SqlPersistence.SqlDialects;
internal static class MainProgram
{
private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
private static IStoreEvents store;
private static void Main()
{
using (store = WireupEventStore())
{
OpenOrCreateStream();
AppendToStream_Client1(revision: 1);
AppendToStream_Client2(revision: 1); // throws an error
// AppendToStream_Client2(revision: 2); // works
}
Console.WriteLine(Resources.PressAnyKey);
Console.ReadKey();
}
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.LogToOutputWindow()
.UsingInMemoryPersistence()
.UsingSqlPersistence("EventStore") // Connection string is in app.config
.WithDialect(new MsSqlDialect())
.InitializeStorageEngine()
.UsingJsonSerialization()
.Build();
}
private static void OpenOrCreateStream()
{
using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
{
var @event = new SomeDomainEvent { Value = "Initial event." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void AppendToStream_Client1(int revision)
{
using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
{
var @event = new SomeDomainEvent { Value = "Second event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void AppendToStream_Client2(int revision)
{
using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
{
var @event = new SomeDomainEvent { Value = "Second event 2." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
那么回到我的问题:启用乐观锁定我应该在打开流时使用修订版吗?还有其他可能的实施或指南吗?
谢谢
小智 6
首先,内存持久性实现(其主要目的是测试)不是事务感知的.在您的原始示例中,客户端2将简单地将其事件附加到流中.尝试使用支持事务的持久性存储(SQL和Raven,但不是Mongo)运行上述内容.
其次,在打开流时指定最小/最大修订版用于不同目的:
ConcurrencyException则会出现竞争条件并且会发生.对大部分内容的支持将封装在域框架中.请参阅CommonDomain中的AggregateBase和EventStoreRepository
第三,最重要的是,在单个事务中更新> 1流是代码气味.如果您正在执行DDD/ES,则流表示单个聚合根,根据定义,它是一致性边界.在事务中创建/更新多个AR会破坏这一点.NEventStore的事务支持为(勉强)加入,以便它可以与其他工具的工作,即事务读取MSMQ/NServiceBus /不管和处理它,或者,事务调度提交信息到队列并标记为这样的命令.就个人而言,我建议你尽力避免使用2PC.
| 归档时间: |
|
| 查看次数: |
1887 次 |
| 最近记录: |