序列化和反序列化域事件以在通用实现中从事件存储中保留和检索

ibe*_*dev 7 c# reflection serialization domain-events event-store

我正在使用DDD与CQRS和事件采购.我需要在我的自定义实现中使用事件存储(特别是此事件存储)IEventStore来持久化并检索域事件,但我遇到了处理序列化/反序列化的方法时遇到困难.

这是我正在实现的接口:

public interface IEventStore
{
    Task<IEnumerable<IDomainEvent>> GetEventsAsync(Identity aggregateIdentity, Type aggregateType);

    Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}
Run Code Online (Sandbox Code Playgroud)

在我的实现之外,IEventStore我可以将每个映射器都包含IDomainEvent在一些可序列化/可反序列化的EventDto或json字符串中.那不是问题.但这些是我的限制:

  • 我的域事件是实现的不可变对象IDomainEvent(即:没有setter)

  • 我的域事件并不总是以通用方式容易地序列化/反序列化.它们通常具有抽象或接口属性,因此我的域事件和一些可序列化对象(如字符串json或事件DTO)之间的具体映射器在我的IEventStore实现之外决定.

  • 我的IEventStore实现需要在一个通用的方式是,如果我添加新的域的事件类型,我应该不需要碰任何东西的范围内IEventStore实施

  • 我的IEventStore实现可以接收注入的一些特定实现IMapper<TSource, TDestination>,以便我可以使用它们在特定类型(而不是接口)之间进行序列化/反序列化.

    public interface IMapper<in TSource, out TDestination>
    {
        TDestination Map(TSource source); // I have implementations of this if needed
    }
    
    Run Code Online (Sandbox Code Playgroud)

以下是我的尝试:

public class MyEventStore
    : IEventStore
{
    private readonly IStreamNameFactory _streamNameFactory;
    private readonly IEventStoreConnection _eventStoreConnection; //this is the Greg Young's EventStore product that I want to use as database
    private readonly IDomainEventFactory _domainEventFactory;
    private readonly IEventDataFactory _eventDataFactory;

    public EventStore(
        IStreamNameFactory streamNameFactory, 
        IEventStoreConnection eventStoreConnection, 
        IDomainEventFactory domainEventFactory, 
        IEventDataFactory eventDataFactory)
    {
        _streamNameFactory = streamNameFactory;
        _eventStoreConnection = eventStoreConnection;
        _domainEventFactory = domainEventFactory;
        _eventDataFactory = eventDataFactory;
    }

    public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(
        Identity aggregateIdentity, 
        Type aggregateType)
    {
        var aggregateIdentityValue = aggregateIdentity.Value;
        var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateType);

        var streamEventSlice =
            await _eventStoreConnection.ReadStreamEventsForwardAsync(streamName, 0, Int32.MaxValue, false);

        var domainEvents = streamEventSlice
            .Events
            .Select(x => _domainEventFactory.Create(x));

        return domainEvents;
    }

    [SuppressMessage("ReSharper", "PossibleMultipleEnumeration")]
    public async Task PersistAsync(
        IAggregateRoot aggregateRoot, 
        IEnumerable<IDomainEvent> domainEvents)
    {
        var numberOfEvents = domainEvents.Count();
        var aggregateRootVersion = aggregateRoot.Version;
        var originalVersion = aggregateRootVersion - numberOfEvents;
        var expectedVersion = originalVersion - 1;

        var aggregateIdentityValue = aggregateRoot.AggregateIdentity.Value;
        var aggregateRootType = aggregateRoot.GetType();
        var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateRootType);
        var assemblyQualifiedName = aggregateRootType.AssemblyQualifiedName;

        var eventsToStore = domainEvents.Select(x => _eventDataFactory.Create(x, assemblyQualifiedName));

        await _eventStoreConnection.AppendToStreamAsync(streamName, expectedVersion, eventsToStore);
    }
}
Run Code Online (Sandbox Code Playgroud)

在实施中,问题主要在于您可以想象IDomainEventFactory.我需要一个实现以下接口的类:

public interface IDomainEventFactory
{
    IDomainEvent Create(ResolvedEvent resolvedEvent);
}
Run Code Online (Sandbox Code Playgroud)

此类需要知道IDomainEvent在运行时将resolveEvent反序列化所需的具体内容.换句话说,如果被检索的事件是json表示,MyThingCreatedEvent也许我可以使用诸如的服务IMapper<ResolvedEvent, MyThingCreatedEvent>.但是如果被检索的事件是json表示,MyThingUpdatedEvent那么我需要一个服务,如IMapper<ResolvedEvent, MyThingUpdatedEvent>.

我想到了一些方法.

选项1: 我认为我可以让IDomainEventFactory实现使用autofac,IComponentContext以便在运行时我可以设法做一些_componentContext.Resolve(theNeededType).但我不知道如何检索我需要的IMapper.也许这是可能的,但我对此表示怀疑.

选项2: 也许我可以拥有一些映射服务,例如IBetterMapper

public interface IBetterMapping
{
    TDestination Map<TDestination>(object source) where TDestination : class;
}
Run Code Online (Sandbox Code Playgroud)

这样我的工厂就可以委托知道如何将任何东西反序列化TDestination.但我会遇到同样的问题:我不知道如何在运行时从字符串创建类型,例如,做类似的事情,_myBetterMapper.Map<WhichTypeHere>并且还有实现Map方法的额外问题,我想这需要一些注册表和基于类型选择一个或另一个特定的映射器.

我真的很困惑.希望我能得到你们的帮助!:)

更新:我已经实现了我自己的解决方案并在我的个人 仓库中上传了项目:https://gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore我使用的解决方案是保持事件存储包装器不可知但是在DI注册时为那些有点"特殊"的事件提供自定义序列化器/解串器.EventStore允许添加自定义元数据头,因此我使用一些自定义头来指定每个数据流上的具体实现类型,以便在检索持久性事件时知道反序列化的位置.

ibe*_*dev 2

更新的答案:

随着时间的推移,我逐渐意识到整个方法是一种糟糕的做法。我认为领域事件不应该具有可能采取不同形状的抽象(多态)属性,因此在反序列化以准确了解事件被序列化成什么形状时会出现问题。

问题不是技术性的(尽管为此,我下面的答案仍然有效),而是哲学性的。

我坚信领域事件应该只使用基本类型。不会改变的东西(字符串,整数,也许是一些“安全”的自定义类型,例如金钱等)。拥有多态域事件没有多大意义。如果一个事件可以采取不同的形式,那么我们可能正在谈论不同的事件

重要的是要考虑到,在创建投影时(例如:在重播期间,或者只是在使用事件源实例化聚合期间),也必须反序列化非常旧的事件(例如:一年前引发的事件) )因此该事件应该正确反序列化而不会失败。想象一下,如果出于某种原因有人修改了该事件正在使用的类之一,并且现在旧信息无法反序列化到新类中,那么会出现什么混乱。我们将违反事件溯源中最基本的原则。

这就是为什么我认为我们不应该将域事件与复杂对象一起使用,除非我们 100% 确定这些类不会改变,并且我们根本不应该使用多态域事件。


我已经在 EventStore .NET Client 上实现了一个包装器,它实现了我的IEventStore接口并从幕后的任何内容中抽象出我的客户端应用程序。

public interface IEventStore
{
    Task<IEnumerable<IDomainEvent>> GetEventsAsync(Guid aggregateId, Type aggregateType);
    Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}
Run Code Online (Sandbox Code Playgroud)

我解决序列化/反序列化主要问题的方法是为“特殊”的域事件提供自定义序列化器/反序列化器(因为它们具有抽象或接口属性,除非知道其特定的具体类型,否则无法反序列化)。此外,对于每个持续存在的域事件,我都会保存元数据标头,说明它是哪种特定域事件类型以及它是哪种特定可序列化事件类型。

换句话说,持久化时的流程是这样的: IDomainEvent -> convert to a serializable type (if needed) -> transform in bytes -> save stream data

并且在检索时 Stream Data -> transform to serializable type -> transform to IDomainEvent

我已将整个项目上传到 GitLab 的个人存储库中: https: //gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore ,请随意查看并使用 xUnit 运行所有集成和单元测试以了解它。当然,请随时提供任何反馈!

我的解决方案的繁重工作在于需要使用事件存储的客户端部分。其基础设施层(在其主机应用程序中注册 Autofac)负责使用 Autofac 扩展注册 EventStore,并在需要时提供所需的自定义序列化器/反序列化器。

这样我就可以保持 EventStore 包装器的实现与特定设置和特定域事件完全无关。这是一个通用的解决方案。

该项目的自述文件澄清了这一点,但如果域事件是可序列化的(没有抽象属性),基本上事件存储可以这样注册:

var builder = new ContainerBuilder(); // Autofac container
builder
    .RegisterEventStore(
        ctx =>
        {
            var eventStoreOptions =
                new EventStoreOptions
                {
                    ConnectionString = "ConnectTo=tcp://admin:changeit@127.0.0.1:1113; HeartBeatTimeout=500";
                };
            return eventStoreOptions;
        });
var container = builder.Build();
Run Code Online (Sandbox Code Playgroud)

如果有一些特殊的领域事件,因为它们具有抽象属性,就像这样:

var builder = new ContainerBuilder();
builder
    .RegisterEventStore(
        ctx =>
        {
            var eventStoreOptions =
                new EventStoreOptions
                {
                    ConnectionString = "ConnectTo=tcp://admin:changeit@127.0.0.1:1113; HeartBeatTimeout=500";
                };
            return eventStoreOptions;
        },
        ctx =>
        {
            var customDomainEventMappersOptions =
                new CustomDomainEventMappersOptions()
                    .UsesCustomMappers<FakeDomainEventNotSerializable, FakeSerializableEvent>(
                        domainEvent =>
                        {
                            var mapper =
                                new FakeDomainEventNotSerializableToFakeSerializableEventMapper();
                            var result = mapper.Map(domainEvent);
                            return result;
                        },
                        serializableEvent =>
                        {
                            var mapper =
                                new FakeSerializableEventToFakeDomainEventNotSerializableMapper();
                            var result = mapper.Map(serializableEvent);
                            return result;
                        });
            return customDomainEventMappersOptions;
        });

var container = builder.Build();
Run Code Online (Sandbox Code Playgroud)