MassTransit - PrefetchCount 和单个消费者的多个通道的说明

Reb*_*cca 5 c# masstransit rabbitmq

我一直在研究 PreFetch,并试图找出为什么 PreFetch 在队列的管理界面上始终设置为 0。在 RabbitMQ 管理界面中,我可以看到通道上配置的预取,但看不到队列本身。我还注意到它们被注册为“全局”而不是“每个消费者”,但在我的一生中,我似乎找不到在 MassTransit 中更改它的设置,尽管我猜我有一个误解关于它是如何工作的,文档并没有帮助我获得 ELI5。

这是一个示例配置:

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
   var host = cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });

    cfg.ReceiveEndpoint(
        host,
        "TEST-QUEUE-PF",
        ec =>
        {
            ec.Consumer<MyConsumer>(context);
            ec.PrefetchCount = 50; // consumer specific
            ec.UseConcurrencyLimit(1); // consumer specific
        });

    cfg.PrefetchCount = 100; // bus control specific
    cfg.UseConcurrencyLimit(1); // bus control specific
});
Run Code Online (Sandbox Code Playgroud)

这将创建以下队列:

队列

然后查看频道,我看到以下有关预取的信息:

在此输入图像描述

如果我查看所有频道,我会看到以下内容:

在此输入图像描述

我正在努力理解每个 PrefetchCounts 的相关内容。

作为一点背景知识,我们有几个运行消费者的多核服务器(即循环,或更合适的“饥饿河马”,因为我不关心平等分配)。PrefetchCount 和 ConcurrencyLimit 的默认设置对我们来说效果不太好,因为我们的消费者有很多工作要做,并且它使数据库服务器超载,导致超时。我正在寻找一种方法来配置这些消费者,这样他们就不会这样做。

这是 MassTransit 5.5.5,因为超过此版本的任何内容都会破坏 UseSerilog() 集成,并且我找不到简单的升级路径。Erlang 和 RabbitMq 本身就是当前版本。这是 AutoFac 模块的更详细信息:

private class BusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterAssemblyTypes(GetType().Assembly).As<IConsumer>();
        builder.Register(context =>
        {
            var busSettings = context.Resolve<BusSettings>();
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(
                    new Uri(busSettings.HostAddress),
                    h =>
                    {
                        h.Username(busSettings.Username);
                        h.Password(busSettings.Password);
                    });

                cfg.ReceiveEndpoint(
                    host,
                    $"TEST-QUEUE-GLOBAL", // shared queue name for all nodes
                    ec =>
                    {
                        ec.PrefetchCount = 50;
                        ec.UseConcurrencyLimit(2);
                        ec.Consumer<MyConsumer>(context);
                        ec.EnablePriority(5);
                        ec.UseRetry(retryConfig =>
                        {
                            retryConfig
                                .Intervals(new[] { 1, 2, 4, 8, 16, 32 }
                                .Select(t => TimeSpan.FromMinutes(t))
                                .ToArray());
                            retryConfig
                                .Handle<HttpRequestException>();
                            retryConfig
                                .Handle<SwaggerException>(ex => ex.IsRetryValid());
                        });
                    });

                cfg.PrefetchCount = 100;
                cfg.UseConcurrencyLimit(2);
                cfg.UseSerilog();

                var correlationIdProvider = context.Resolve<ICorrelationProvider>();
                cfg.ConfigurePublish(x => x.UseExecute(sendContext =>
                {
                    sendContext.CorrelationId = 
                        sendContext.CorrelationId == Guid.Empty ? 
                            correlationIdProvider.GetId() : sendContext.CorrelationId; // cascade
                }));
            });

            return busControl;
        })
        .SingleInstance()
        .As<IBusControl>()
        .As<IBus>();
    }
}
Run Code Online (Sandbox Code Playgroud)

Chr*_*son 4

首先,我假设您使用的是旧版本的 MassTransit,因为从 v6 开始已进行切换以摆脱全局预取。

其次,高预取计数与 1 的并发限制相结合将导致 (prefetchcount - 1) 条消息位于接收端点上等待处理,同时一次处理 1 条消息。因此,如果只有 50 条消息,第一个节点可能会获取所有消息,然后其他节点就会空闲,因为消息正在单个节点上等待通过瓶颈。

当前版本的 RabbitMQ 管理控制台以及通道预取如下所示:

RabbitMQ 预取计数

由于 MassTransit 仅将单个消费者置于一个通道上,因此之前的方法本质上将消费者限制为全局通道预取,但现在它更加明确。此外,新设置适用于仲裁队列,该队列不支持全局预取设置。

如果您的数据库超载,并且已经优化了数据库查询以避免锁定/阻塞,并且需要减少流量,请降低预取以接近并发限制的 140%。所以,说真的,如果你的值是 1,请将预取设置为 2。