通过Unity的同一消息的多个消费者无法在MassTransit中工作

jul*_*gon 6 .net esb masstransit unity-container command-pattern

我最近遇到了很多问题,因为包中似乎有一个错误MassTransit.UnityIntegration,主要是因为注册名称没有被考虑.

例如,如果我像这样注册我的类:

var container = new UnityContainer()
    .RegisterType<Consumes<Command1>.All, Handler1>("Handler1")
    .RegisterType<Consumes<Command1>.All, Handler3>("Handler3");
Run Code Online (Sandbox Code Playgroud)

几行之后,我使用LoadFrom扩展方法来获取容器中的注册使用者,如下所示:

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom("msmq://localhost/MyQueue");
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(_s => _s.LoadFrom(container));
    });
Run Code Online (Sandbox Code Playgroud)

当相关消息到达总线时,我的处理程序永远不会被调用.

在思考了一段时间之后,我决定看一下实现,并且很清楚为什么会发生这种情况:

这是方法中的主要代码LoadFrom:

public static void LoadFrom(this SubscriptionBusServiceConfigurator configurator, IUnityContainer container)
{
    IList<Type> concreteTypes = FindTypes<IConsumer>(container, x => !x.Implements<ISaga>());
    if (concreteTypes.Count > 0)
    {
        var consumerConfigurator = new UnityConsumerFactoryConfigurator(configurator, container);

        foreach (Type concreteType in concreteTypes)
            consumerConfigurator.ConfigureConsumer(concreteType);
    }

    ...

}
Run Code Online (Sandbox Code Playgroud)

请注意,它只查找类型,并且不会向前传递任何名称信息.这是FindTypes<T>实施:

static IList<Type> FindTypes<T>(IUnityContainer container, Func<Type, bool> filter)
{
    return container.Registrations
                        .Where(r => r.MappedToType.Implements<T>())
                        .Select(r => r.MappedToType)
                        .Where(filter)
                        .ToList();
}
Run Code Online (Sandbox Code Playgroud)

在几个间接之后,这一切都归结为类中的这一行UnityConsumerFactory<T>,它实际上创建了消费者的实例:

var consumer = childContainer.Resolve<T>();
Run Code Online (Sandbox Code Playgroud)

当有多个注册时,这绝对不适用于Unity,因为在Unity中注册(然后解决)多个实现的唯一方法是在RegisterType调用时为它们命名,然后在调用时指定此名称Resolve.

也许我在这一切中遗漏了一些完全基本的东西,错误就在我身上?可以在此处找到MassTransit Unity组件的来源.我没有查看其他容器的代码,因为我不熟悉它们,但我认为这已经以某种方式处理了?我认为在同一个容器中为同一个消息类型拥有多个消费者实际上很常见.

在这种特殊情况下,不仅要传递Type容器中的注册,还要传递用于注册的名称.

更新

那么Travis花时间解释它的问题就更清楚了.我应该早点注意到它.

看来我应该直接注册类型,以便在工厂内正确解析它们,如下所示:

var container = new UnityContainer()
    .RegisterType<Handler1>()
    .RegisterType<Handler3>();
Run Code Online (Sandbox Code Playgroud)

使用这种方法,我也可以省略注册名称,因为现在它们在容器内的构建键是不同的.

好吧,如果这是我们的真实场景,这将完美地工作,但事实并非如此.让我解释一下我们到底在做什么:

在我们开始使用MassTransit之前,我们已经有一个用于命令模式的接口,称为ICommandHandler<TCommand>,其中TCommand是系统中命令的基本模型.当我们开始考虑使用服务总线时,从一开始就很清楚,应该可以稍后切换到另一个服务总线实现而不会有太多麻烦.考虑到这一点,我开始在我们的命令界面上创建一个抽象,表现得像MT期望的消费者之一.这就是我想出的:

public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All
    where T : class, ICommand
{
    private readonly ICommandHandler<T> _commandHandler;

    public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler)
    {
        _commandHandler = commandHandler;
    }

    public void Consume(T _message)
    {
        _commandHandler.Handle(_message);
    }
}
Run Code Online (Sandbox Code Playgroud)

这是一个非常简单的适配器类.它接收一个ICommandHandler<T>实现并使其行为像一个Consumes<T>.All实例.很遗憾,MT要求消息模型是类,因为我们没有对命令进行约束,但这是一个小小的不便,我们继续where T : class向我们的接口添加约束.

然后,由于我们的处理程序接口已经在容器中注册,因此将使用此适配器实现注册MT接口并让容器在其上注入实际实现.例如,一个更现实的例子(直接来自我们的代码库):

.RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder")
.RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor")
.RegisterType<Consumes<ApplicationInstallationResult>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationResult>>()
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>>
  ("Recorder", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Recorder")))
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>>
  ("Executor", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Executor")))
Run Code Online (Sandbox Code Playgroud)

指定的注册有点复杂但需要,因为我们现在有两个消费者用于同一消息.虽然不像我们希望的那样干净,但我们可以忍受这一点,因为这促进了我们的代码与MassTransit特定逻辑的巨大分离:适配器类位于一个单独的程序集中,仅由系统中的最后一层引用,用于容器注册目的.这似乎是一个非常好的主意,但现在通过容器集成类背后的查找逻辑确认不支持.

请注意,我无法在此处注册具体类,因为中间有一个通用适配器类.

更新2:

按照特拉维斯的建议,我尝试了这个简单的代码也没有用(我不明白为什么,因为它似乎完全有效).这是一个明确的消费者工厂注册,没有任何自动容器集成:

_sbc.Consume(() => container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder"))
Run Code Online (Sandbox Code Playgroud)

该解析调用正确地给了我以前注册的CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>实例,它实现了Consumes<ApplicationInstallationCommand>.All,而实际应该是支持的基本接口之一.ApplicationInstallationCommand在此之后发布权限不执行任何操作,就好像处理程序无效或类似.

这有效:

_sbc.Consume(() => (CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>) container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder"))
Run Code Online (Sandbox Code Playgroud)

很明显,API中的一些内容是以非泛型的方式处理编译类型,而不是基于通用接口.

我的意思是......这是可行的,但注册代码没有明显的原因(由于我将在MT的部分考虑为'非标准实现细节').也许我只是抓住这里的稻草?也许这一切归结为"为什么MT不接受它自己的,已经是通用的界面?" 为什么在编译时需要具体类型才能看到它是一个消息处理程序,即使我传递给它的实例Consumes<X>.All也是在编译时输入的?

更新3:

在下面与Travis讨论之后,我决定完全放弃UnityIntegration程序集Consumer,并对订阅进行独立调用.

我在MassTransit特定程序集中创建了一个小扩展类,以方便:

public static class CommandHandlerEx
{
    public static CommandHandlerToConsumerAdapter<T> ToConsumer<T>(this ICommandHandler<T> _handler)
        where T : class, ICommand
    {
        return new CommandHandlerToConsumerAdapter<T>(_handler);
    }
}
Run Code Online (Sandbox Code Playgroud)

最后注册了这样的处理程序:

var container = new UnityContainer()
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder")
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor");

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom("msmq://localhost/MyQueue");
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(RegisterConsumers);
    });

private void RegisterConsumers(SubscriptionBusServiceConfigurator _s)
{
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Recorder").ToConsumer());
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Executor").ToConsumer());
}
Run Code Online (Sandbox Code Playgroud)

在昨天使用了整整一天来尝试解决问题之后,我强烈建议您远离容器扩展程序集,如果您希望从容器中预期的行为和/或您想要自定义类等(就像我做的那样去耦)我们的消息类来自MT特定代码)主要有两个原因:

  1. 扩展中的逻辑遍历容器中的注册以查找使用者类.在我看来,这是一个糟糕的设计.如果有东西需要从容器中实现,它应该只调用它ResolveResolveAll它的接口(或非Unity术语中的等价物),而不关心究竟是什么注册以及它们的具体类型是什么.对于假定容器可以返回未明确注册的类型的代码,这会产生严重后果.幸运的是,这些类并非如此,但我们确实有一个容器扩展,可以根据构建键自动创建装饰器类型,并且不需要在容器上显式注册它们.

  2. 使用者注册使用实例MappedToType上的属性ContainerRegistration来调用Resolve容器.这在任何情况下都是完全错误的,而不仅仅是在MassTransit的背景下.Unity中的类型要么注册为映射(如上面的摘录,带有FromTo组件),要么直接作为单个具体类型注册.在两种情况下,逻辑应该使用RegisteredType类型来从容器中解析.它现在的工作方式是,如果你碰巧用他们的接口注册处理程序,MT将完全绕过你的注册逻辑并调用具体类型的解析,这在Unity开箱即用,可能会导致不可预测的行为,因为你认为它应该像你注册的单身,但它最终成为一个瞬态对象(默认),例如.

现在回想一下,我可以看到它原本相信的要复杂得多.在这个过程中也有相当多的学习,所以这很好.

更新4:

昨天我决定在进行最终签入之前重构整个适配器方法.我使用MassTransit的界面模式来创建我的适配器,因为我认为这是一个非常好的和干净的语法.

结果如下:

public sealed class CommandHandlerToConsumerAdapter<T>
    where T : class, ICommand
{
    public sealed class All : Consumes<T>.All
    {
        private readonly ICommandHandler<T> m_commandHandler;

        public All(ICommandHandler<T> _commandHandler)
        {
            m_commandHandler = _commandHandler;
        }

        public void Consume(T _message)
        {
            m_commandHandler.Handle(_message);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

不幸的是,这会破坏MassTransit的代码,因为在引用的Magnum库中的实用程序方法上,在一个名为的扩展方法上有一个未处理的异常ToShortTypeName.

这是一个例外:

MassTransit中的ArgumentOutOfRangeException接收

在System.String.Substring(的Int32的startIndex,的Int32长度)
在Magnum.Extensions.ExtensionsToType.ToShortTypeName(类型类型)
在MassTransit.Pipeline.Sinks.ConsumerMessageSink 2.<>c__DisplayClass1.<Selector>b__0(IConsumeContext1上下文)中d:\ BuildAgent-02 \工作\ aa063b4295dfc097\SRC\MassTransit \管道\水槽\ ConsumerMessageSink.cs:在MassTransit.Pipeline.Sinks.InboundConvertMessageSink`1线51 <> c__DisplayClass2 <> c__DisplayClass4.b__1(IConsumeContext x)的在d:\ BuildAgent-02 \工作\ aa063b4295dfc097\SRC\MassTransit\Pipeline\Sinks\InboundConvertMessageSink.cs:第45行,位于d:\ BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Context\ServiceBusReceiveContext.cs中的MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(IReceiveContext context):第162行

Tra*_*vis 4

虽然我不知道 Unity 集成,但对于所有容器,您必须将消费者注册为容器中的具体类型,而不是接口Consumes<>。我认为这只是RegisterType<Handler1, Handler1>(),但我对此并不完全确定。

如果您不喜欢LoadFrom容器的扩展,则无论如何都不需要使用它。您始终可以自己解析消费者并通过_sbc.Consume(() => container.resolve<YourConsumerType>())在配置中注册它们。LoadFrom对于以常见方式使用容器的人来说,该扩展只是一个说服力。

以下代码有效,它按照我期望的方式使用容器,无需更多了解您的域,即可使用它。如果您想更好地了解消息如何绑定,我建议使用 RabbitMQ,因为您可以通过交换绑定轻松查看事物的最终结果。在这一点上,这远远超出了一个问题,如果您有任何进一步的信息,我会将其添加到邮件列表中。

using System;
using MassTransit;
using Microsoft.Practices.Unity;

namespace MT_Unity
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var container = new UnityContainer()
                .RegisterType<ICommandHandler<MyCommand>, MyCommandHandler>()
                .RegisterType<CommandHandlerToConsumerAdapter<MyCommand>>())

            using (IServiceBus consumerBus = ServiceBusFactory.New(sbc =>
                    {
                        sbc.ReceiveFrom("rabbitmq://localhost/consumer");
                        sbc.UseRabbitMq();


                        sbc.Subscribe(s => s.Consumer(() => container.Resolve<CommandHandlerToConsumerAdapter<MyCommand>>()));
                    }))
            using (IServiceBus publisherBus = ServiceBusFactory.New(sbc =>
                    {
                        sbc.ReceiveFrom("rabbitmq://localhost/publisher");
                        sbc.UseRabbitMq();
                    }))
            {
                publisherBus.Publish(new MyCommand());

                Console.ReadKey();
            }
        }
    }

    public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All where T : class, ICommand
    {
        private readonly ICommandHandler<T> _commandHandler;

        public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler)
        {
            _commandHandler = commandHandler;
        }

        public void Consume(T message)
        {
            _commandHandler.Handle(message);
        }
    }

    public interface ICommand { }
    public class MyCommand : ICommand { }

    public interface ICommandHandler<T> where T : class, ICommand
    {
        void Handle(T message);
    }

    public class MyCommandHandler : ICommandHandler<MyCommand>
    {
        public MyCommandHandler()
        {

        }
        public void Handle(MyCommand message)
        {
            Console.WriteLine("Handled MyCommand");
        }
    }

}
Run Code Online (Sandbox Code Playgroud)