基本的RabbitMQ教程给出了如何从队列中连续检索消息的示例:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false, false, false, null);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", true, consumer);
Console.WriteLine(" [*] Waiting for messages." +
"To exit press CTRL+C");
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}
}
}
Run Code Online (Sandbox Code Playgroud)
我想要做的是检索已放入队列然后停止的所有消息.
以下两个例子可以解决我的问题
要么
我意识到我可以在我的消息中加上时间戳并检查它,或者我可以设置超时值,但我想知道是否有任何内置的方法来正确地执行此操作.
提前致谢.
从评论看来,RabbitMQ似乎不适用于批处理,因此它不是为此目的而设计的.
我还注意到,在测试DequeueNoWait方法或尝试以零超时出列时,它们根本不起作用,只返回null.
以下解决方案使用QueueDeclare获取现有消息的计数,并且不需要时间戳或hacky超时:
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var queueDeclareResponse = channel.QueueDeclare(Constants.QueueName, false, false, false, null);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(Constants.QueueName, true, consumer);
Console.WriteLine(" [*] Processing existing messages.");
for (int i = 0; i < queueDeclareResponse.MessageCount; i++)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}
Console.WriteLine("Finished processing {0} messages.", queueDeclareResponse.MessageCount);
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6369 次 |
最近记录: |