ibe*_*dev 7 c# reflection serialization domain-events event-store
我正在使用DDD与CQRS和事件采购.我需要在我的自定义实现中使用事件存储(特别是此事件存储)IEventStore来持久化并检索域事件,但我遇到了处理序列化/反序列化的方法时遇到困难.
这是我正在实现的接口:
public interface IEventStore
{
Task<IEnumerable<IDomainEvent>> GetEventsAsync(Identity aggregateIdentity, Type aggregateType);
Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}
Run Code Online (Sandbox Code Playgroud)
在我的实现之外,IEventStore我可以将每个映射器都包含IDomainEvent在一些可序列化/可反序列化的EventDto或json字符串中.那不是问题.但这些是我的限制:
我的域事件是实现的不可变对象IDomainEvent(即:没有setter)
我的域事件并不总是以通用方式容易地序列化/反序列化.它们通常具有抽象或接口属性,因此我的域事件和一些可序列化对象(如字符串json或事件DTO)之间的具体映射器在我的IEventStore实现之外决定.
我的IEventStore实现需要在一个通用的方式是,如果我添加新的域的事件类型,我应该不需要碰任何东西的范围内IEventStore实施
我的IEventStore实现可以接收注入的一些特定实现IMapper<TSource, TDestination>,以便我可以使用它们在特定类型(而不是接口)之间进行序列化/反序列化.
public interface IMapper<in TSource, out TDestination>
{
TDestination Map(TSource source); // I have implementations of this if needed
}
Run Code Online (Sandbox Code Playgroud)以下是我的尝试:
public class MyEventStore
: IEventStore
{
private readonly IStreamNameFactory _streamNameFactory;
private readonly IEventStoreConnection _eventStoreConnection; //this is the Greg Young's EventStore product that I want to use as database
private readonly IDomainEventFactory _domainEventFactory;
private readonly IEventDataFactory _eventDataFactory;
public EventStore(
IStreamNameFactory streamNameFactory,
IEventStoreConnection eventStoreConnection,
IDomainEventFactory domainEventFactory,
IEventDataFactory eventDataFactory)
{
_streamNameFactory = streamNameFactory;
_eventStoreConnection = eventStoreConnection;
_domainEventFactory = domainEventFactory;
_eventDataFactory = eventDataFactory;
}
public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(
Identity aggregateIdentity,
Type aggregateType)
{
var aggregateIdentityValue = aggregateIdentity.Value;
var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateType);
var streamEventSlice =
await _eventStoreConnection.ReadStreamEventsForwardAsync(streamName, 0, Int32.MaxValue, false);
var domainEvents = streamEventSlice
.Events
.Select(x => _domainEventFactory.Create(x));
return domainEvents;
}
[SuppressMessage("ReSharper", "PossibleMultipleEnumeration")]
public async Task PersistAsync(
IAggregateRoot aggregateRoot,
IEnumerable<IDomainEvent> domainEvents)
{
var numberOfEvents = domainEvents.Count();
var aggregateRootVersion = aggregateRoot.Version;
var originalVersion = aggregateRootVersion - numberOfEvents;
var expectedVersion = originalVersion - 1;
var aggregateIdentityValue = aggregateRoot.AggregateIdentity.Value;
var aggregateRootType = aggregateRoot.GetType();
var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateRootType);
var assemblyQualifiedName = aggregateRootType.AssemblyQualifiedName;
var eventsToStore = domainEvents.Select(x => _eventDataFactory.Create(x, assemblyQualifiedName));
await _eventStoreConnection.AppendToStreamAsync(streamName, expectedVersion, eventsToStore);
}
}
Run Code Online (Sandbox Code Playgroud)
在实施中,问题主要在于您可以想象IDomainEventFactory.我需要一个实现以下接口的类:
public interface IDomainEventFactory
{
IDomainEvent Create(ResolvedEvent resolvedEvent);
}
Run Code Online (Sandbox Code Playgroud)
此类需要知道IDomainEvent在运行时将resolveEvent反序列化所需的具体内容.换句话说,如果被检索的事件是json表示,MyThingCreatedEvent也许我可以使用诸如的服务IMapper<ResolvedEvent, MyThingCreatedEvent>.但是如果被检索的事件是json表示,MyThingUpdatedEvent那么我需要一个服务,如IMapper<ResolvedEvent, MyThingUpdatedEvent>.
我想到了一些方法.
选项1:
我认为我可以让IDomainEventFactory实现使用autofac,IComponentContext以便在运行时我可以设法做一些_componentContext.Resolve(theNeededType).但我不知道如何检索我需要的IMapper.也许这是可能的,但我对此表示怀疑.
选项2: 也许我可以拥有一些映射服务,例如IBetterMapper
public interface IBetterMapping
{
TDestination Map<TDestination>(object source) where TDestination : class;
}
Run Code Online (Sandbox Code Playgroud)
这样我的工厂就可以委托知道如何将任何东西反序列化TDestination.但我会遇到同样的问题:我不知道如何在运行时从字符串创建类型,例如,做类似的事情,_myBetterMapper.Map<WhichTypeHere>并且还有实现Map方法的额外问题,我想这需要一些注册表和基于类型选择一个或另一个特定的映射器.
我真的很困惑.希望我能得到你们的帮助!:)
更新:我已经实现了我自己的解决方案并在我的个人 仓库中上传了项目:https://gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore我使用的解决方案是保持事件存储包装器不可知但是在DI注册时为那些有点"特殊"的事件提供自定义序列化器/解串器.EventStore允许添加自定义元数据头,因此我使用一些自定义头来指定每个数据流上的具体实现类型,以便在检索持久性事件时知道反序列化的位置.
更新的答案:
随着时间的推移,我逐渐意识到整个方法是一种糟糕的做法。我认为领域事件不应该具有可能采取不同形状的抽象(多态)属性,因此在反序列化以准确了解事件被序列化成什么形状时会出现问题。
问题不是技术性的(尽管为此,我下面的答案仍然有效),而是哲学性的。
我坚信领域事件应该只使用基本类型。不会改变的东西(字符串,整数,也许是一些“安全”的自定义类型,例如金钱等)。拥有多态域事件没有多大意义。如果一个事件可以采取不同的形式,那么我们可能正在谈论不同的事件。
重要的是要考虑到,在创建投影时(例如:在重播期间,或者只是在使用事件源实例化聚合期间),也必须反序列化非常旧的事件(例如:一年前引发的事件) )因此该事件应该正确反序列化而不会失败。想象一下,如果出于某种原因有人修改了该事件正在使用的类之一,并且现在旧信息无法反序列化到新类中,那么会出现什么混乱。我们将违反事件溯源中最基本的原则。
这就是为什么我认为我们不应该将域事件与复杂对象一起使用,除非我们 100% 确定这些类不会改变,并且我们根本不应该使用多态域事件。
我已经在 EventStore .NET Client 上实现了一个包装器,它实现了我的IEventStore接口并从幕后的任何内容中抽象出我的客户端应用程序。
public interface IEventStore
{
Task<IEnumerable<IDomainEvent>> GetEventsAsync(Guid aggregateId, Type aggregateType);
Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}
Run Code Online (Sandbox Code Playgroud)
我解决序列化/反序列化主要问题的方法是为“特殊”的域事件提供自定义序列化器/反序列化器(因为它们具有抽象或接口属性,除非知道其特定的具体类型,否则无法反序列化)。此外,对于每个持续存在的域事件,我都会保存元数据标头,说明它是哪种特定域事件类型以及它是哪种特定可序列化事件类型。
换句话说,持久化时的流程是这样的:
IDomainEvent -> convert to a serializable type (if needed) -> transform in bytes -> save stream data
并且在检索时
Stream Data -> transform to serializable type -> transform to IDomainEvent
我已将整个项目上传到 GitLab 的个人存储库中: https: //gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore ,请随意查看并使用 xUnit 运行所有集成和单元测试以了解它。当然,请随时提供任何反馈!
我的解决方案的繁重工作在于需要使用事件存储的客户端部分。其基础设施层(在其主机应用程序中注册 Autofac)负责使用 Autofac 扩展注册 EventStore,并在需要时提供所需的自定义序列化器/反序列化器。
这样我就可以保持 EventStore 包装器的实现与特定设置和特定域事件完全无关。这是一个通用的解决方案。
该项目的自述文件澄清了这一点,但如果域事件是可序列化的(没有抽象属性),基本上事件存储可以这样注册:
var builder = new ContainerBuilder(); // Autofac container
builder
.RegisterEventStore(
ctx =>
{
var eventStoreOptions =
new EventStoreOptions
{
ConnectionString = "ConnectTo=tcp://admin:changeit@127.0.0.1:1113; HeartBeatTimeout=500";
};
return eventStoreOptions;
});
var container = builder.Build();
Run Code Online (Sandbox Code Playgroud)
如果有一些特殊的领域事件,因为它们具有抽象属性,就像这样:
var builder = new ContainerBuilder();
builder
.RegisterEventStore(
ctx =>
{
var eventStoreOptions =
new EventStoreOptions
{
ConnectionString = "ConnectTo=tcp://admin:changeit@127.0.0.1:1113; HeartBeatTimeout=500";
};
return eventStoreOptions;
},
ctx =>
{
var customDomainEventMappersOptions =
new CustomDomainEventMappersOptions()
.UsesCustomMappers<FakeDomainEventNotSerializable, FakeSerializableEvent>(
domainEvent =>
{
var mapper =
new FakeDomainEventNotSerializableToFakeSerializableEventMapper();
var result = mapper.Map(domainEvent);
return result;
},
serializableEvent =>
{
var mapper =
new FakeSerializableEventToFakeDomainEventNotSerializableMapper();
var result = mapper.Map(serializableEvent);
return result;
});
return customDomainEventMappersOptions;
});
var container = builder.Build();
Run Code Online (Sandbox Code Playgroud)