如何通过 .NET RabbitMQ 客户端利用消费者的预取计数

bst*_*ack 4 c# messaging rabbitmq

RabbitMQ 文档中声明如下

“根据经验,在线程之间共享 Channel 实例是应该避免的。应用程序应该更喜欢每个线程使用一个 Channel,而不是在多个线程之间共享相同的 Channel。”

目前我们正在查看预取计数,建议如果您的消费者数量较少并且 autoack=false,那么我们应该一次消费许多消息。然而,我们发现,如果消费者使用单个执行线程发回手动确认,则预取不会生效。但是,如果我们将消费者处理包装在任务中,我们会发现预取计数确实很重要,并且可以显着提高消费者性能。

请参阅以下示例,其中我们将消费者对消息的消费包装在 Task 对象中:

class Program
{
    public static void Main()
    {
        var factory = new ConnectionFactory()
        {
            HostName = "172.20.20.13",
            UserName = "billy",
            Password = "guest",
            Port = 5671,
            VirtualHost = "/",
            Ssl = new SslOption
            {
                Enabled = true,
                ServerName = "rabbit.blah.com",
                Version = System.Security.Authentication.SslProtocols.Tls12
            }
        };
        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();
        channel.BasicQos(0, 100, false);
        channel.ExchangeDeclare(exchange: "logs", type: "fanout");
        var queueName = channel.QueueDeclare().QueueName;
        Console.WriteLine(" [*] Waiting for logs.");

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var _result = new Task(() => {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                System.Threading.Thread.Sleep(80);

                channel.BasicAck(ea.DeliveryTag, false);
            });
            _result.Start();
        };
        channel.BasicConsume(queue: "test.queue.1", autoAck: false, consumer: consumer);

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

我的问题是人们如何使用.NETrabbitmq客户端实现使用预取计数的消费者?,你必须使用某种任务手动确认吗?,安全吗?

Luk*_*ken 6

您参考的文档适用于 Java 客户端。您应该参考此文档

您使用的是最新版本的 .NET 客户端 ( 5.1),因此在事件处理程序中执行工作Received不会阻止处理 TCP 数据的其他线程,也不会阻止心跳 - 这两点都很好。

首先,调用channel.BasicQos(0, 1, false)意味着您的消费者一次只会从 RabbitMQ 接收一条就绪消息,并且在BasicAck调用之前不会传递另一条消息。因此,实际上没有理由在另一个线程中完成工作,因为无论如何您都不会收到其他消息。

如果您增加预取值(通过实验和运行基准测试),并且您的工作运行时间超过几毫秒,则您将必须在后台线程中完成工作。

当您在事件回调中执行工作时Received,它将阻塞用于进行该回调的线程,因为回调不是在其自己的线程上执行。因此,您可以确保您的工作非常短,或者在另一个线程中完成工作。

我刚刚花了一些时间检查 .NET 客户端代码,我很确定该IModel实例不是线程安全的。如果增加预取,您将有机会同时确认多条消息,因此我建议实现一个使用该预取的解决方案,并确保BasicAck在创建连接的同一线程上调用该解决方案。