Concurrency in RabbitMQ

Kia*_*Kia 5 c# concurrency multithreading rabbitmq

After a week of coding and searching forums, it seems timely to ask...

I have a C# application which processes messages sent by RabbitMQ using EventingBasicConsumer. I want to process several messages concurrently, so I have instantiated a few channels (8 in this case) on the same connection, each with a single consumer. I have then attached an event-handler to each consumer's Received event. Based on all my readings so far, this setup should allow the event-handler to be triggered concurrently by the consumers, each running in its own thread. But in my case consumers receive messages sequentially only after the a previous consumer acknowledges its message.

Has anyone else experienced this behavior? Is my understanding correct that the processing should technically be concurrent in this case?

Below is a basic code to better illustrate the issue:

Initialise() {
    ConsumerChannels_ = new IModel[ConsumerCount_];
    Consumers_ = new EventingBasicConsumer[ConsumerCount_];
    for (int i = 0; i < ConsumerCount_; ++i)
    {
         ConsumerChannels_[i] = Connection_.CreateModel();
         Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
         Consumers_[i].Received += MessageReceived;
    }
}

MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
    int id = GetConsumerIndex(sender);
    Log_.Debug("Consumer " + id + ": processing started...");         
    // do some time consuming processing here
    sender.Model.BasicAck(e.DeliveryTag, false);
    Log_.Debug("Consumer " + id + ": processing ended.");
}
Run Code Online (Sandbox Code Playgroud)

What I expect to see is something like: // concurrent processing

Consumer 1: processing started...

Consumer 2: processing started...

Consumer 3: processing started...

...

Consumer 6: processing ended.

Consumer 7: processing ended.

Consumer 8: processing ended.

But what I get instead is: // sequential processing

Consumer 1: processing started...

Consumer 1: processing ended.

Consumer 2: processing started...

Consumer 2: processing ended.

...

Consumer 8: processing started...

Consumer 8: processing ended.

Any ideas on how to proceed would be most appreciated.

Gab*_*ele 1

你必须采取一些方法来做到这一点:

通过在其中添加自己的线程池来增加并发性:

MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e) {
    int id = GetConsumerIndex(sender);
    Log_.Debug("Consumer " + id + ": processing started...");         
    // do some time consuming processing here
    // PUT your thread-pool here and process the messages inside the thread

    sender.Model.BasicAck(e.DeliveryTag, false);
    Log_.Debug("Consumer " + id + ": processing ended."); }

}
Run Code Online (Sandbox Code Playgroud)

注意:BasicAck可以在不同的线程中调用。

或者

您可以将更多消费者添加到队列中,通过使用QoS=1您可以循环消费消息