bst*_*ack 4 c# messaging rabbitmq
RabbitMQ 文档中声明如下
“根据经验,在线程之间共享 Channel 实例是应该避免的。应用程序应该更喜欢每个线程使用一个 Channel,而不是在多个线程之间共享相同的 Channel。”
目前我们正在查看预取计数,建议如果您的消费者数量较少并且 autoack=false,那么我们应该一次消费许多消息。然而,我们发现,如果消费者使用单个执行线程发回手动确认,则预取不会生效。但是,如果我们将消费者处理包装在任务中,我们会发现预取计数确实很重要,并且可以显着提高消费者性能。
请参阅以下示例,其中我们将消费者对消息的消费包装在 Task 对象中:
class Program
{
public static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "172.20.20.13",
UserName = "billy",
Password = "guest",
Port = 5671,
VirtualHost = "/",
Ssl = new SslOption
{
Enabled = true,
ServerName = "rabbit.blah.com",
Version = System.Security.Authentication.SslProtocols.Tls12
}
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.BasicQos(0, 100, false);
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var _result = new Task(() => {
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
System.Threading.Thread.Sleep(80);
channel.BasicAck(ea.DeliveryTag, false);
});
_result.Start();
};
channel.BasicConsume(queue: "test.queue.1", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
我的问题是人们如何使用.NETrabbitmq客户端实现使用预取计数的消费者?,你必须使用某种任务手动确认吗?,安全吗?
您参考的文档适用于 Java 客户端。您应该参考此文档。
您使用的是最新版本的 .NET 客户端 ( 5.1),因此在事件处理程序中执行工作Received不会阻止处理 TCP 数据的其他线程,也不会阻止心跳 - 这两点都很好。
首先,调用channel.BasicQos(0, 1, false)意味着您的消费者一次只会从 RabbitMQ 接收一条就绪消息,并且在BasicAck调用之前不会传递另一条消息。因此,实际上没有理由在另一个线程中完成工作,因为无论如何您都不会收到其他消息。
如果您增加预取值(通过实验和运行基准测试),并且您的工作运行时间超过几毫秒,则您将必须在后台线程中完成工作。
当您在事件回调中执行工作时Received,它将阻塞用于进行该回调的线程,因为回调不是在其自己的线程上执行。因此,您可以确保您的工作非常短,或者在另一个线程中完成工作。
我刚刚花了一些时间检查 .NET 客户端代码,我很确定该IModel实例不是线程安全的。如果增加预取,您将有机会同时确认多条消息,因此我建议实现一个使用该预取的解决方案,并确保BasicAck在创建连接的同一线程上调用该解决方案。