RabbitMQ在c#客户端上手动确认

Mat*_*Tap 5 c# rabbitmq

我试图在一个非常简单的控制台应用程序上使用手动ACK,但我无法使其工作.

在发件人上,我有以下代码:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "task_queue",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    var message = GetMessage(args);
    var body = Encoding.UTF8.GetBytes(message);

    channel.ConfirmSelect();
    channel.BasicAcks += (sender, e) =>
    {
        Console.Write("ACK received");
    };

    var properties = channel.CreateBasicProperties();

    channel.BasicPublish(exchange: "",
                         routingKey: "task_queue",
                         basicProperties: properties,
                         body: body);

    Console.WriteLine(" [x] Sent {0}", message);
}

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Run Code Online (Sandbox Code Playgroud)

在接收器上我有以下代码:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "task_queue",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    channel.ConfirmSelect();

    Console.WriteLine(" [*] Waiting for messages.");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(" [x] Received {0}", message);

        int dots = message.Split('.').Length - 1;
        Thread.Sleep(dots * 1000);

        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        Console.WriteLine(" [x] Done");
    };
    channel.BasicConsume(queue: "task_queue",
                         noAck: false,
                         consumer: consumer);

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)

我期待的是,BasicAcks当我调用channel.BasicAck()接收器时,发送方上的事件被触发,但是在将消息传递给客户端之前,该事件正在被触发consumer.Received.

是我期待的正确行为还是我错过了什么?

Evk*_*Evk 8

你的期望是不正确的.BasicAcks是关于出版商确认,而不是关于来自接收者的确认.因此,您向代理和代理发布消息(因此,RabbitMQ本身)将在处理此消息时确认或拒绝(否定确认)(例如,当它将其写入磁盘以获取持久消息时,或者将其写入队列).请注意,此处不涉及接收器 - 它完全在发布者和RabbitMQ之间.

现在,当你在接收器上发出Ack消息时 - 再次只在接收器和RabbitMQ之间 - 你告诉兔子消息被处理并且可以安全删除.这样做是为了处理接收器在处理期间崩溃的情况 - 然后兔子将能够将此消息传递给下一个接收器(如果有的话).

请注意,这种结构的整个目的是将发布者和接收者分开 - 它们不应相互依赖.

如果你有一个接收器(可能有很多)并且你想确保它处理你的消息 - 使用RPC模式:发送消息并等待来自该接收器的另一个消息.