我试图在一个非常简单的控制台应用程序上使用手动ACK,但我无法使其工作.
在发件人上,我有以下代码:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.ConfirmSelect();
channel.BasicAcks += (sender, e) =>
{
Console.Write("ACK received");
};
var properties = channel.CreateBasicProperties();
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Run Code Online (Sandbox Code Playgroud)
在接收器上我有以下代码:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
channel.ConfirmSelect();
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue",
noAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)
我期待的是,BasicAcks当我调用channel.BasicAck()接收器时,发送方上的事件被触发,但是在将消息传递给客户端之前,该事件正在被触发consumer.Received.
是我期待的正确行为还是我错过了什么?
你的期望是不正确的.BasicAcks是关于出版商确认,而不是关于来自接收者的确认.因此,您向代理和代理发布消息(因此,RabbitMQ本身)将在处理此消息时确认或拒绝(否定确认)(例如,当它将其写入磁盘以获取持久消息时,或者将其写入队列).请注意,此处不涉及接收器 - 它完全在发布者和RabbitMQ之间.
现在,当你在接收器上发出Ack消息时 - 再次只在接收器和RabbitMQ之间 - 你告诉兔子消息被处理并且可以安全删除.这样做是为了处理接收器在处理期间崩溃的情况 - 然后兔子将能够将此消息传递给下一个接收器(如果有的话).
请注意,这种结构的整个目的是将发布者和接收者分开 - 它们不应相互依赖.
如果你有一个接收器(可能有很多)并且你想确保它处理你的消息 - 使用RPC模式:发送消息并等待来自该接收器的另一个消息.