使用rabbitmq 的mastransit:为什么消息在发布时自动移动到_skipped 队列

0 masstransit servicebus rabbitmq

MassTransit.3.1.2 MassTransit.Autofac.3.1.1 MassTransit.RabbitMQ.3.1.1 RabbitMQ.Client.3.6.0 Topshelf.3.3.1

一个 Topshelf Windows 服务,创建一个像这样的总线实例:

var builder = new ContainerBuilder();
builder.RegisterConsumers(Assembly.GetExecutingAssembly());
builder.Register<IBusControl>(context =>
        {
            return Bus.Factory.CreateUsingRabbitMq(rbmq =>
            {
                var host = rbmq.Host(new Uri("rabbitmq://" + BusConfig.Instance.Host + ":" + BusConfig.Instance.Port + "/" + BusConfig.Instance.VHost), h =>
                {
                    h.Username(BusConfig.Instance.UserName);
                    h.Password(BusConfig.Instance.Password);
                });
                rbmq.UseJsonSerializer();
                rbmq.UseNLog();

                rbmq.ReceiveEndpoint(BusConfig.Instance.Queue, edp =>
                {
                    edp.UseRetry(Retry.Incremental(5, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)));
                    edp.LoadFrom(context);
                });
            });
        }).SingleInstance().As<IBusControl>().As<IBus>();

        return builder.Build().Resolve<IBusControl>();
Run Code Online (Sandbox Code Playgroud)

像这样的一个控制台应用程序?

var bus = Bus.Factory.CreateUsingRabbitMq(rbmq =>
           {
               var host = rbmq.Host(new Uri("rabbitmq://" + BusConfig.Instance.Host + ":" + BusConfig.Instance.Port + "/" + BusConfig.Instance.VHost), h =>
               {
                   h.Username(BusConfig.Instance.UserName);
                   h.Password(BusConfig.Instance.Password);
               });
           });
        bus.Start();
        var msg = new OrderCreatedMessage() { OrderId = 10102 };

        bus.Publish<OrderCreatedMessage>(msg);
Run Code Online (Sandbox Code Playgroud)

当由控制台应用程序发布的消息时?由“BusConfig.Instance.Queue”命名的Rabbitmq队列收到了OrderCreatedMessage?

消息发布时的Rabbitmq队列视图

然后我启动了 topshelf 服务?OrderCreatedMessage 自动删除到 _skipped 队列?

topshelf 服务启动时的 Rabbitmq 队列视图

大众运输日志是这样的:

MOVE:rabbitmq:\/\/192.168.12.217:5672\/zst\/zst.order.queue?prefetch=8:rabbitmq:\/\/192.168.12.217:5672\/zst\/zst.order.queue_skipped?bind=true&queue=zst.order.queue_skipped:N\/A:Moved
Run Code Online (Sandbox Code Playgroud)

但是,当我发布消息并使用相同的总线(Topshelf 服务)消费消息时,它起作用了!!!

对此架构的任何帮助或其他见解将不胜感激!

Chr*_*son 5

查看已发布消息类型的交换与服务端点输入队列的交换之间的绑定。确保正确绑定了正确的类型交换。由于消息正在传递,我猜这部分是正确的。

对于接收端点,似乎消费者在某一时刻是正确的(这解释了绑定存在的原因),但目前可能没有使用正确的消息类型。消息类型必须是消费者和发布者中相同的消息契约,消费者要消费的消息。

当消息移动到 _skipped 时,该端点上没有消费者实际使用消息本身中的消息类型。我建议发布以下输出以供审核:

bus.GetProbeResult().ToJsonString()
Run Code Online (Sandbox Code Playgroud)

这将显示正在注册的消费者和正在消费的消息类型。它还将极大地帮助您解决所看到的问题。