MassTransit - 多个消费者都可以收到相同的消息吗?

fou*_*der 8 .net c# masstransit rabbitmq .net-core

我有一个 .NET 4.5.2 服务通过 MassTransit 向 RabbitMq 发布消息。

以及使用这些消息的 .NET Core 2.1 服务的多个实例。

目前,.NET 核心消费者服务的竞争实例从其他实例窃取消息。

即第一个消费消息的人将其从队列中取出,其余的服务实例不会消费它。

我希望所有实例都使用相同的消息。

我怎样才能做到这一点?

发布者服务配置如下:

 builder.Register(context =>
            {
                MessageCorrelation.UseCorrelationId<MyWrapper>(x => x.CorrelationId);

                return Bus.Factory.CreateUsingRabbitMq(configurator =>
                {
                    configurator.Host(new Uri("rabbitmq://localhost:5671"), host =>
                    {
                        host.Username(***);
                        host.Password(***);
                    });
                    configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
                    configurator.Publish<MyWrapper>(x =>
                    {
                        x.AutoDelete = true;
                        x.Durable = true;
                        x.ExchangeType = true;
                    });

                });
            })
            .As<IBusControl>()
            .As<IBus>()
            .SingleInstance();
Run Code Online (Sandbox Code Playgroud)

.NET Core Consumer Services 配置如下:

        serviceCollection.AddScoped<MyWrapperConsumer>();

        serviceCollection.AddMassTransit(serviceConfigurator =>
        {
            serviceConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://localhost:5671"), hostConfigurator =>
                {
                    hostConfigurator.Username(***);
                    hostConfigurator.Password(***);

                });
                cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
                {
                    exchangeConfigurator.AutoDelete = true;
                    exchangeConfigurator.Durable = true;
                    exchangeConfigurator.ExchangeType = "topic";
                    exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
                });
            }));
        });
        serviceCollection.AddSingleton<IHostedService, BusService>();
Run Code Online (Sandbox Code Playgroud)

然后 MyWrapperConsumer 看起来像这样:

public class MyWrapperConsumer :
    IConsumer<MyWrapper>
{
    .
    .

    public MyWrapperConsumer(...) => (..) = (..);

    public async Task Consume(ConsumeContext<MyWrapper> context)
    {
        //Do Stuff 
    }
}
Run Code Online (Sandbox Code Playgroud)

Chr*_*son 10

听起来您想要发布消息并让多个消费者服务实例接收它们。在这种情况下,每个服务实例都需要有自己的队列。这样,每个发布的消息都会导致一个副本被传递到每个队列。然后,每个接收端点将从其自己的队列中读取该消息并使用它。

您所做的所有过度配置都与您想要的背道而驰。要使其工作,请删除所有交换类型配置,只需为每个服务实例配置一个唯一的队列名称(您可以从主机、机器等生成它),然后在消息生产者上调用 Publish。

你可以看到RabbitMQ拓扑是如何配置的:https : //masstransit-project.com/advanced/topology/rabbitmq.html

  • 谢谢回复。我很难得到这个,这是我的错。新手。我已经尝试过我认为你所说的通过将我的消费服务接收端点更改为以下内容。但没有运气。如果您能回答我应该进行的配置更改的代码示例,我将不胜感激?无法弄清楚如何将每个队列绑定到发布者交换。谢谢 cfg.ReceiveEndpoint(host, "my.exchange.123", ExchangeConfigurator =&gt; { ExchangeConfigurator.Consumer&lt;MyWrapperConsumer&gt;(provider); }); (2认同)

fou*_*der 6

感谢 Chris Patterson 的回答和 Alexey Zimarev 的评论,我现在相信我可以完成这项工作。

这些人指出(根据我的理解,如果我错了,请纠正我),我应该自己摆脱指定交换和队列等,并停止对我的配置如此细致。

让 MassTransit 知道要创建和发布到哪个交换,以及根据我的类型创建和绑定到该交换的队列MyWrapper。以及我的IConsumer实现类型MyWrapperConsumer

然后为每个消费者服务提供其自己的唯一ReceiveEndpoint名称,我们最终将交换将 MyWrapper 类型的消息扇出到每个由指定的唯一名称创建的唯一队列。

所以,就我而言..

发布者服务配置相关代码行更改为:

    configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
            configurator.Publish<MyWrapper>(x =>
            {
                x.AutoDelete = true;
                x.Durable = true;
                x.ExchangeType = true;
            });
Run Code Online (Sandbox Code Playgroud)

对此

       configurator.Message<MyWrapper>(x => { });
       configurator.AutoDelete = true;
Run Code Online (Sandbox Code Playgroud)

并且每个消费者服务实例配置相关代码行更改为:

        cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
                {
                    exchangeConfigurator.AutoDelete = true;
                    exchangeConfigurator.Durable = true;
                    exchangeConfigurator.ExchangeType = "topic";
                    exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
                });
Run Code Online (Sandbox Code Playgroud)

对此:

        cfg.ReceiveEndpoint(host, Environment.MachineName, queueConfigurator =>
                {
                    queueConfigurator.AutoDelete = true;
                    queueConfigurator.Consumer<MyWrapperConsumer>(provider);
                });
Run Code Online (Sandbox Code Playgroud)

请注意,Environment.MachineName给出每个实例的唯一队列名称


Die*_*ner 5

我想分享一个稍微不同的代码示例。实例ID:

指定唯一标识端点实例的标识符,该标识符附加到端点名称的末尾。

  services.AddMassTransit(x => {
    x.SetKebabCaseEndpointNameFormatter();
    Guid instanceId = Guid.NewGuid();
    x.AddConsumer<MyConsumer>()
      .Endpoint(c => c.InstanceId = instanceId.ToString());

    x.UsingRabbitMq((context, cfg) => {
      ...
      cfg.ConfigureEndpoints(context);
    });
  });
Run Code Online (Sandbox Code Playgroud)