RabbitMQ:AsyncEventingBasicConsumer 与 EventingBasicConsumer

Bri*_*sen 5 rabbitmq

我一直在做一个小示例应用程序来使用 RabbitMQ 中队列上的消息。

代码应该读取消息并调用 REST API(此处替换为Task.Delay):

static void Main(string[] args)
{
    var factory = new ConnectionFactory
    {
        Uri = new Uri("..."),
        DispatchConsumersAsync = true
    };
    var connection = factory.CreateConnection();
    var channel = connection.CreateModel();

    var consumer = new AsyncEventingBasicConsumer(channel);
    consumer.Received += async (model, eventArgs) =>
    {
        Console.WriteLine("Doing a fake API call...");
        await Task.Delay(2000);
        Console.WriteLine("Done with fake API call!");
        channel.BasicAck(eventArgs.DeliveryTag, false);
    };
    channel.BasicConsume("myQueue", false, consumer);
}
Run Code Online (Sandbox Code Playgroud)

当我使用队列中的 5 条消息运行此应用程序时,我得到以下结果: 留言顺序

消息被一一处理,2 秒的延迟大约需要 10 秒。我本来希望看到五行,Doing a fake API call...然后是五行,Done with fake API call!总时间约为 2 秒。

在执行同步版本时,我看到完全相同的结果 - 这是预期的:

static void Main(string[] args)
{
    var factory = new ConnectionFactory
    {
        Uri = new Uri("...")
    };
    var connection = factory.CreateConnection();
    var channel = connection.CreateModel();

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, eventArgs) =>
    {
        Console.WriteLine("Doing a fake API call...");
        Thread.Sleep(2000);
        Console.WriteLine("Done with fake API call!");
        channel.BasicAck(eventArgs.DeliveryTag, false);
    };
    channel.BasicConsume("myQueue", false, consumer);
}
Run Code Online (Sandbox Code Playgroud)

我的问题是:使用 与AsyncEventingBasicConsumer相比有什么区别EventingBasicConsumer并且:有没有办法让消费者在等待先前消息的工作时处理其他消息?

use*_*108 10

这是一个老问题,我相信您不会还在等待答案,但我发现真正确定 RabbitMQ 行为的细节可能具有挑战性。因此,希望这里至少有一些信息能够帮助将来的人。

RMQEventingBasicConsumer和 的AsyncEventingBasicConsumer主要区别在于它们的签名,因此我们可以将处理程序附加到和 等async事件。如果您在它们之间来回切换(并进行适当的设置),您可能会注意到行为没有变化。这是因为下面的调度程序正在“异步”调用事件,但没有并发程度。ReceivedConsumerCancelledDispatchConsumersAsync

要在 RMQ 连接上启用并发消息处理,请在建立连接之前在对象ConsumerDispatchConcurrency = {>1}上进行设置。IConnectionFactory它默认为 1,这实际上是串行处理。

例如,使用您的场景,如果您设置了ConsumerDispatchConcurrency = 5,那么我怀疑您会看到您最初期望的内容。就像是:

Doing a fake API call...
Doing a fake API call...
Doing a fake API call...
Doing a fake API call...
Doing a fake API call...
Done with fake API call!
Done with fake API call!
Done with fake API call!
Done with fake API call!
Done with fake API call!
Run Code Online (Sandbox Code Playgroud)

虽然ConsumerDispatchConcurrency = 2可能看起来更像这样:

Doing a fake API call...
Doing a fake API call...
Done with fake API call!
Done with fake API call!
Doing a fake API call...
Doing a fake API call...
Done with fake API call!
Done with fake API call!
Doing a fake API call...
Done with fake API call!
Run Code Online (Sandbox Code Playgroud)