Reb*_*cca 5 c# masstransit rabbitmq
我一直在研究 PreFetch,并试图找出为什么 PreFetch 在队列的管理界面上始终设置为 0。在 RabbitMQ 管理界面中,我可以看到通道上配置的预取,但看不到队列本身。我还注意到它们被注册为“全局”而不是“每个消费者”,但在我的一生中,我似乎找不到在 MassTransit 中更改它的设置,尽管我猜我有一个误解关于它是如何工作的,文档并没有帮助我获得 ELI5。
这是一个示例配置:
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ReceiveEndpoint(
host,
"TEST-QUEUE-PF",
ec =>
{
ec.Consumer<MyConsumer>(context);
ec.PrefetchCount = 50; // consumer specific
ec.UseConcurrencyLimit(1); // consumer specific
});
cfg.PrefetchCount = 100; // bus control specific
cfg.UseConcurrencyLimit(1); // bus control specific
});
Run Code Online (Sandbox Code Playgroud)
这将创建以下队列:
然后查看频道,我看到以下有关预取的信息:
如果我查看所有频道,我会看到以下内容:
我正在努力理解每个 PrefetchCounts 的相关内容。
作为一点背景知识,我们有几个运行消费者的多核服务器(即循环,或更合适的“饥饿河马”,因为我不关心平等分配)。PrefetchCount 和 ConcurrencyLimit 的默认设置对我们来说效果不太好,因为我们的消费者有很多工作要做,并且它使数据库服务器超载,导致超时。我正在寻找一种方法来配置这些消费者,这样他们就不会这样做。
这是 MassTransit 5.5.5,因为超过此版本的任何内容都会破坏 UseSerilog() 集成,并且我找不到简单的升级路径。Erlang 和 RabbitMq 本身就是当前版本。这是 AutoFac 模块的更详细信息:
private class BusModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterAssemblyTypes(GetType().Assembly).As<IConsumer>();
builder.Register(context =>
{
var busSettings = context.Resolve<BusSettings>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ReceiveEndpoint(
host,
$"TEST-QUEUE-GLOBAL", // shared queue name for all nodes
ec =>
{
ec.PrefetchCount = 50;
ec.UseConcurrencyLimit(2);
ec.Consumer<MyConsumer>(context);
ec.EnablePriority(5);
ec.UseRetry(retryConfig =>
{
retryConfig
.Intervals(new[] { 1, 2, 4, 8, 16, 32 }
.Select(t => TimeSpan.FromMinutes(t))
.ToArray());
retryConfig
.Handle<HttpRequestException>();
retryConfig
.Handle<SwaggerException>(ex => ex.IsRetryValid());
});
});
cfg.PrefetchCount = 100;
cfg.UseConcurrencyLimit(2);
cfg.UseSerilog();
var correlationIdProvider = context.Resolve<ICorrelationProvider>();
cfg.ConfigurePublish(x => x.UseExecute(sendContext =>
{
sendContext.CorrelationId =
sendContext.CorrelationId == Guid.Empty ?
correlationIdProvider.GetId() : sendContext.CorrelationId; // cascade
}));
});
return busControl;
})
.SingleInstance()
.As<IBusControl>()
.As<IBus>();
}
}
Run Code Online (Sandbox Code Playgroud)
首先,我假设您使用的是旧版本的 MassTransit,因为从 v6 开始已进行切换以摆脱全局预取。
其次,高预取计数与 1 的并发限制相结合将导致 (prefetchcount - 1) 条消息位于接收端点上等待处理,同时一次处理 1 条消息。因此,如果只有 50 条消息,第一个节点可能会获取所有消息,然后其他节点就会空闲,因为消息正在单个节点上等待通过瓶颈。
当前版本的 RabbitMQ 管理控制台以及通道预取如下所示:
由于 MassTransit 仅将单个消费者置于一个通道上,因此之前的方法本质上将消费者限制为全局通道预取,但现在它更加明确。此外,新设置适用于仲裁队列,该队列不支持全局预取设置。
如果您的数据库超载,并且已经优化了数据库查询以避免锁定/阻塞,并且需要减少流量,请降低预取以接近并发限制的 140%。所以,说真的,如果你的值是 1,请将预取设置为 2。