如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器

ibe*_*dev 10 generics masstransit adapter autofac message-bus

我成功地将 MassTransit 用于一个愚蠢的示例应用程序,我从发布者控制台应用程序发布一条消息(一个事件),并在两个不同的消费者处接收它,这些消费者也是使用 RabbitMq 的控制台应用程序。

这是整个示例项目 git repo: https : //gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus

我想要一个包含 MassTransit 功能的项目,以便我的 Publisher 和 Consumers 项目对 MassTransit 一无所知。依赖关系应该朝这个方向发展:

  • DiDrDe.MessageBus ==> MassTransit
  • DiDrDe.MessageBus ==> DiDrDe.Contracts
  • DiDrDe.Model ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.MessageBus
  • DiDrDe.Publisher ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.Model
  • DiDrDe.ConsumerOne ==> DiDrDe.Contracts
  • DiDrDe.ConsumerOne ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerOne ==> DiDrDe.Model
  • DiDrDe.ConsumerTwo ==> DiDrDe.Contracts
  • DiDrDe.ConsumerTwo ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerTwo ==> DiDrDe.Model

请注意 DiDrDe.MessageBus 对 DiDrDe.Model 一无所知,因为它是一个通用项目,应该对任何消息类型都有效。

为了实现这一点,我正在实施适配器模式,以便我的自定义接口IEventDtoBus(发布事件)和IEventDtoHandler<TEventDto>(使用事件)是我的发布者和消费者都知道的。MassTransit 包装器项目(称为 DiDrDe.MessageBus)实现了一个EventDtoBusAdapter由 anIEventDtoBus和 a组成的适配器EventDtoHandlerAdapter<TEventDto>作为我唯一的IConsumer<TEventDto>由一个组成的泛型IEventDtoHandler<TEventDto>

我遇到的问题是 MassTransit 要求消费者注册的方式,因为我的消费者是一个通用消费者,它的类型在编译时不应被 MassTransit 包装器知道。

我需要找到一种方法来将EventDtoHandlerAdapter<TEventDto>我在运行时传递的每个 TEventDto 类型注册为使用者(例如作为类型的集合)。有关所有详细信息,请参阅我的存储库。

MassTransit 支持接受类型的重载方法(很好!正是我想要的),但它也需要第二个参数Func<type, object> consumerFactory,我不知道如何实现它。

更新 1问题是我无法注册这个通用消费者,例如:

consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>();
Run Code Online (Sandbox Code Playgroud)

因为我收到编译错误

严重性代码描述项目文件行抑制状态错误 CS0310 'EventDtoHandlerAdapter' 必须是具有公共无参数构造函数的非抽象类型,以便将其用作泛型类型或方法 'ConsumerExtensions.Consumer(IReceiveEndpointConfigurator, Action> )' DiDrDe.MessageBus C:\src\DiDrDe.MessageBus\DiDrDe.MessageBus\IoCC\Autofac\RegistrationExtensions.cs

更新 2:我已经尝试了几件事,并且我已经更新了我的 repo 上的项目。这些是我在 MassTransit 包装项目中的尝试。请注意,如果我向要处理的每个消息(事件)添加依赖项,我如何能够让一切正常工作。但我不希望那样......我不希望这个项目知道关于它可以处理的消息的任何信息。如果我能注册只知道消息类型的消费者就好了。

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    //THIS WORKS
    var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
    consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));

    // DOES NOT WORK
    //var typeEventDtoHandler = typeof(IEventDtoHandler<>).MakeGenericType(typeof(ThingHappened));
    //var eventDtoHandler = context.Resolve(typeEventDtoHandler);
    //consumer.Consumer(eventDtoHandler);

    // DOES NOT WORK
    //consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);

    // DOES NOT WORK
    //var consumerGenericType = typeof(IConsumer<>);
    //var consumerThingHappenedType = consumerGenericType.MakeGenericType(typeof(ThingHappened));
    //consumer.Consumer(consumerThingHappenedType,  null);
});
Run Code Online (Sandbox Code Playgroud)

更新 3:按照 Igor 的建议,我尝试执行以下操作:

var adapterType = typeof(EventDtoHandlerAdapter<>).MakeGenericType(typeof(ThingHappened));
                            consumer.Consumer(adapterType, (Type x) => context.Resolve(x));
Run Code Online (Sandbox Code Playgroud)

但我收到一个运行时错误说

请求的服务“DiDrDe.MessageBus.EventDtoHandlerAdapter`1[[DiDrDe.Model.ThingHappened, DiDrDe.Model, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]”尚未注册。要避免此异常,请注册一个组件以提供服务,使用 IsRegistered() 检查服务注册,或使用 ResolveOptional() 方法来解析可选依赖项。

我什至尝试EventDtoHandlerAdapter<>单独注册为 IConsumer,以防万一,但没有运气。

builder
    .RegisterGeneric(typeof(EventDtoHandlerAdapter<>))
    .As(typeof(IConsumer<>))
    .SingleInstance();
Run Code Online (Sandbox Code Playgroud)

还有:

builder
  .RegisterType<EventDtoHandlerAdapter<ThingHappened>>()
  .AsSelf();
Run Code Online (Sandbox Code Playgroud)

它告诉我

System.ObjectDisposedException: '这个解析操作已经结束。使用 lambda 注册组件时,无法存储 lambda 的 IComponentContext 'c' 参数。相反,要么从“c”再次解析 IComponentContext,要么解析基于 Func<> 的工厂以从中创建后续组件

澄清一下,我需要注册的唯一消费者是我的EventDtoHandlerAdapter<TEventDto>. 它是通用的,所以基本上它会存在一个我支持的每个 TEventDto 的注册。问题是我不需要提前类型,所以我需要对类型进行操作。

更新 4: Igor 建议的新尝试。这次是“代理”。我已经用所有细节的最后一次尝试更新了我的回购。我有我的消费者和标记界面:

public interface IEventDtoHandler
{
}

public interface IEventDtoHandler<in TEventDto>
    : IEventDtoHandler
        where TEventDto : IEventDto
{
    Task HandleAsync(TEventDto eventDto);
}
Run Code Online (Sandbox Code Playgroud)

我有一个对大众运输一无所知的消费者的实现:

public class ThingHappenedHandler
    : IEventDtoHandler<ThingHappened>
{
    public Task HandleAsync(ThingHappened eventDto)
    {
        Console.WriteLine($"Received {eventDto.Name} " +
                            $"{eventDto.Description} at consumer one that uses an IEventDtoHandler");
        return Task.CompletedTask;
    }
}
Run Code Online (Sandbox Code Playgroud)

现在我的“包装消费者”就是我所说的适配器,因为它知道 MassTransit(它实现了 MassTransit IConsumer)。

public class EventDtoHandlerAdapter<TConsumer, TEventDto>
    : IConsumer<TEventDto>
        where TConsumer : IEventDtoHandler<TEventDto>
        where TEventDto : class, IEventDto
{
    private readonly TConsumer _consumer;

    public EventDtoHandlerAdapter(TConsumer consumer)
    {
        _consumer = consumer;
    }

    public async Task Consume(ConsumeContext<TEventDto> context)
    {
        await _consumer.HandleAsync(context.Message);
    }
}
Run Code Online (Sandbox Code Playgroud)

现在最后一步是向 MassTransit 注册我的“包装消费者”。但由于它是通用的,我不知道该怎么做。这是问题。

我可以按照以下建议在 Autofac 中扫描和注册我的所有消费者类型:

var interfaceType = typeof(IEventDtoHandler);
var consumerTypes =
    AppDomain.CurrentDomain.GetAssemblies()
        .SelectMany(x => x.GetTypes())
        .Where(x => interfaceType.IsAssignableFrom(x)
                    && !x.IsInterface
                    && !x.IsAbstract)
        .ToList();
Run Code Online (Sandbox Code Playgroud)

所以现在我有了所有的消费者类型( 的所有实现IEventDtoHandler,包括我的ThingHappenedHandler)。现在怎么办?如何注册?

类似以下内容不起作用

foreach (var consumerType in consumerTypes)
{
    consumer.Consumer(consumerType, (Type x) => context.Resolve(x));
}
Run Code Online (Sandbox Code Playgroud)

但是我想它不起作用是正常的,因为我要注册的是我的EventDtoHandlerAdapter,这是真正的IConsumer.

所以,我想我没有理解你的建议。对不起!

我需要的是这样的:

//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<IEventDtoHandler<ThingHappened>, ThingHappened>(eventDtoHandler));
Run Code Online (Sandbox Code Playgroud)

但是不使用 ThingHappened 模型,因为模型不应该是已知的。这是我卡住的地方

更新 5:按照 Chris Patterson 的建议进行的新尝试(他的解决方案已合并到我的 repo 中的 master 中),但问题仍然存在。

澄清一下,DiDrDe.MessageBus必须与任何发布者、消费者和模型无关。它应该只依赖于 MassTransit 和DiDrDe.Contracts,而 Chris 的解决方案有如下一行:

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
});
Run Code Online (Sandbox Code Playgroud)

这直接依赖于ThingHappened模型。这是不允许的,它实际上与我已经拥有的解决方案没有太大区别:

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    //THIS works, but it uses ThingHappened explicitly and I don't want that dependency
    var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
    consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
});
Run Code Online (Sandbox Code Playgroud)

抱歉,如果这还不够清楚,但 DiDrDe.MessageBus 最终将成为在许多不同的使用者和发布者项目之间共享的 nuGet 包,并且它不应依赖于任何特定的消息/模型。

更新 6: 问题已解决。非常感谢 Igor 和 Chris 的时间和帮助。我已将解决方案推送到我的回购中。

PS:不幸的是,当我在同一个消费者中有两个处理程序处理同一个事件时,这个解决方案有其局限性,因为似乎只有一个处理程序被执行(两次)。我希望两个处理程序都被执行,或者只执行一个,但只执行一次(不是两次)。但这已经是另一个主题了:)

Igo*_*žić 9

在此处查看自定义消费者约定:https : //github.com/MassTransit/MassTransit/tree/develop/src/MassTransit.Tests/Conventional

创建您自己的 IMyConsumerInterface,IMyMessageInterface 将其插入该测试的代码中。ConsumerConvention.Register<CustomConsumerConvention>();在创建总线之前注册它。它应该工作。

此外,您可以围绕消费者上下文创建自己的包装器,并将其与消息一起传递。

LoadFrom (MassTransit.AutofacIntegration) 不适用于自定义约定,因此我必须手动注册消费者

foreach (var consumer in consumerTypes)
  cfg.Consumer(consumer, (Type x) => _container.Resolve(x));
Run Code Online (Sandbox Code Playgroud)

或者,如果您想使用“代理”方法,请执行以下操作:

public class WrapperConsumer<TConsumer, TMessage> : IConsumer<TMessage>
    where TMessage : class, IMyMessageInterface 
    where TConsumer : IMyConsumerInterface<TMessage>
{
    private readonly TConsumer _consumer;

    public WrapperConsumer(TConsumer consumer)
    {
        _consumer = consumer;
    }

    public Task Consume(ConsumeContext<TMessage> context)
    {
        return _consumer.Consume(context.Message);
    }
}
Run Code Online (Sandbox Code Playgroud)

...

// create wrapper registrations
cfg.Consumer(() => new WrapperConsumer<MyConsumer, MyMessage>(new MyConsumer()));
Run Code Online (Sandbox Code Playgroud)


适用于两种方法的附加代码

// marker interface
public interface IMyConsumerInterface
{
}

public interface IMyConsumerInterface<T> : IMyConsumerInterface
    where T : IMyMessageInterface 
{
    Task Consume(T message);
}

...

builder.RegisterAssemblyTypes(ThisAssembly)
           .AssignableTo<IMyConsumerInterface>()
           .AsSelf()
           .As<IMyConsumerInterface>();

...

var interfaceType = typeof(IMyConsumerInterface);
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
    .Where(x => interfaceType.IsAssignableFrom(x) && !x.IsInterface && !x.IsAbstract)
    .ToList();
Run Code Online (Sandbox Code Playgroud)


回复:更新 5

builder.Register(context =>
{
    var ctx = context.Resolve<IComponentContext>();
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
        {
            foreach (var adapterType in adapterTypes)
                consumer.Consumer(adapterType, (Type type) => ctx .Resolve(adapterType));
        });
    });
    return busControl;
})
Run Code Online (Sandbox Code Playgroud)

  • (回复:更新5,无法评论克里斯的回答)你就快到了,而是 `consumer.Consumer&lt;EventDtoHandlerAdapter&lt;ThingHappened&gt;&gt;(context);` 使用 `foreach (var adapterType in adapterTypes)consumer.Consumer(适配器类型,(类型类型)=&gt; ctx.Resolve(适配器类型));`。您不需要模型,只需合同。删除模型并添加合同 (2认同)