fou*_*der 8 .net c# masstransit rabbitmq .net-core
我有一个 .NET 4.5.2 服务通过 MassTransit 向 RabbitMq 发布消息。
以及使用这些消息的 .NET Core 2.1 服务的多个实例。
目前,.NET 核心消费者服务的竞争实例从其他实例窃取消息。
即第一个消费消息的人将其从队列中取出,其余的服务实例不会消费它。
我希望所有实例都使用相同的消息。
我怎样才能做到这一点?
发布者服务配置如下:
builder.Register(context =>
{
MessageCorrelation.UseCorrelationId<MyWrapper>(x => x.CorrelationId);
return Bus.Factory.CreateUsingRabbitMq(configurator =>
{
configurator.Host(new Uri("rabbitmq://localhost:5671"), host =>
{
host.Username(***);
host.Password(***);
});
configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
configurator.Publish<MyWrapper>(x =>
{
x.AutoDelete = true;
x.Durable = true;
x.ExchangeType = true;
});
});
})
.As<IBusControl>()
.As<IBus>()
.SingleInstance();
Run Code Online (Sandbox Code Playgroud)
.NET Core Consumer Services 配置如下:
serviceCollection.AddScoped<MyWrapperConsumer>();
serviceCollection.AddMassTransit(serviceConfigurator =>
{
serviceConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost:5671"), hostConfigurator =>
{
hostConfigurator.Username(***);
hostConfigurator.Password(***);
});
cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
{
exchangeConfigurator.AutoDelete = true;
exchangeConfigurator.Durable = true;
exchangeConfigurator.ExchangeType = "topic";
exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
});
}));
});
serviceCollection.AddSingleton<IHostedService, BusService>();
Run Code Online (Sandbox Code Playgroud)
然后 MyWrapperConsumer 看起来像这样:
public class MyWrapperConsumer :
IConsumer<MyWrapper>
{
.
.
public MyWrapperConsumer(...) => (..) = (..);
public async Task Consume(ConsumeContext<MyWrapper> context)
{
//Do Stuff
}
}
Run Code Online (Sandbox Code Playgroud)
Chr*_*son 10
听起来您想要发布消息并让多个消费者服务实例接收它们。在这种情况下,每个服务实例都需要有自己的队列。这样,每个发布的消息都会导致一个副本被传递到每个队列。然后,每个接收端点将从其自己的队列中读取该消息并使用它。
您所做的所有过度配置都与您想要的背道而驰。要使其工作,请删除所有交换类型配置,只需为每个服务实例配置一个唯一的队列名称(您可以从主机、机器等生成它),然后在消息生产者上调用 Publish。
你可以看到RabbitMQ拓扑是如何配置的:https : //masstransit-project.com/advanced/topology/rabbitmq.html
感谢 Chris Patterson 的回答和 Alexey Zimarev 的评论,我现在相信我可以完成这项工作。
这些人指出(根据我的理解,如果我错了,请纠正我),我应该自己摆脱指定交换和队列等,并停止对我的配置如此细致。
让 MassTransit 知道要创建和发布到哪个交换,以及根据我的类型创建和绑定到该交换的队列MyWrapper。以及我的IConsumer实现类型MyWrapperConsumer。
然后为每个消费者服务提供其自己的唯一ReceiveEndpoint名称,我们最终将交换将 MyWrapper 类型的消息扇出到每个由指定的唯一名称创建的唯一队列。
所以,就我而言..
发布者服务配置相关代码行更改为:
configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
configurator.Publish<MyWrapper>(x =>
{
x.AutoDelete = true;
x.Durable = true;
x.ExchangeType = true;
});
Run Code Online (Sandbox Code Playgroud)
对此
configurator.Message<MyWrapper>(x => { });
configurator.AutoDelete = true;
Run Code Online (Sandbox Code Playgroud)
并且每个消费者服务实例配置相关代码行更改为:
cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
{
exchangeConfigurator.AutoDelete = true;
exchangeConfigurator.Durable = true;
exchangeConfigurator.ExchangeType = "topic";
exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
});
Run Code Online (Sandbox Code Playgroud)
对此:
cfg.ReceiveEndpoint(host, Environment.MachineName, queueConfigurator =>
{
queueConfigurator.AutoDelete = true;
queueConfigurator.Consumer<MyWrapperConsumer>(provider);
});
Run Code Online (Sandbox Code Playgroud)
请注意,Environment.MachineName给出每个实例的唯一队列名称
我想分享一个稍微不同的代码示例。实例ID:
指定唯一标识端点实例的标识符,该标识符附加到端点名称的末尾。
services.AddMassTransit(x => {
x.SetKebabCaseEndpointNameFormatter();
Guid instanceId = Guid.NewGuid();
x.AddConsumer<MyConsumer>()
.Endpoint(c => c.InstanceId = instanceId.ToString());
x.UsingRabbitMq((context, cfg) => {
...
cfg.ConfigureEndpoints(context);
});
});
Run Code Online (Sandbox Code Playgroud)