Kia*_*Kia 5 c# concurrency multithreading rabbitmq
After a week of coding and searching forums, it seems timely to ask...
I have a C# application which processes messages sent by RabbitMQ using EventingBasicConsumer. I want to process several messages concurrently, so I have instantiated a few channels (8 in this case) on the same connection, each with a single consumer. I have then attached an event-handler to each consumer's Received event. Based on all my readings so far, this setup should allow the event-handler to be triggered concurrently by the consumers, each running in its own thread. But in my case consumers receive messages sequentially only after the a previous consumer acknowledges its message.
Has anyone else experienced this behavior? Is my understanding correct that the processing should technically be concurrent in this case?
Below is a basic code to better illustrate the issue:
Initialise() {
ConsumerChannels_ = new IModel[ConsumerCount_];
Consumers_ = new EventingBasicConsumer[ConsumerCount_];
for (int i = 0; i < ConsumerCount_; ++i)
{
ConsumerChannels_[i] = Connection_.CreateModel();
Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
Consumers_[i].Received += MessageReceived;
}
}
MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
int id = GetConsumerIndex(sender);
Log_.Debug("Consumer " + id + ": processing started...");
// do some time consuming processing here
sender.Model.BasicAck(e.DeliveryTag, false);
Log_.Debug("Consumer " + id + ": processing ended.");
}
Run Code Online (Sandbox Code Playgroud)
What I expect to see is something like: // concurrent processing
Consumer 1: processing started...
Consumer 2: processing started...
Consumer 3: processing started...
...
Consumer 6: processing ended.
Consumer 7: processing ended.
Consumer 8: processing ended.
But what I get instead is: // sequential processing
Consumer 1: processing started...
Consumer 1: processing ended.
Consumer 2: processing started...
Consumer 2: processing ended.
...
Consumer 8: processing started...
Consumer 8: processing ended.
Any ideas on how to proceed would be most appreciated.
你必须采取一些方法来做到这一点:
通过在其中添加自己的线程池来增加并发性:
MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e) {
int id = GetConsumerIndex(sender);
Log_.Debug("Consumer " + id + ": processing started...");
// do some time consuming processing here
// PUT your thread-pool here and process the messages inside the thread
sender.Model.BasicAck(e.DeliveryTag, false);
Log_.Debug("Consumer " + id + ": processing ended."); }
}
Run Code Online (Sandbox Code Playgroud)
注意:BasicAck可以在不同的线程中调用。
或者
您可以将更多消费者添加到队列中,通过使用QoS=1您可以循环消费消息