
我发现这个图像与我的商务模型非常相似.我需要将消息拆分到某个队列.对于一些繁重的工作.我可以为他们添加更多的工作线程.但对于一些没有太多繁重的工作.我可以让单个消费者订阅他们的消息.但是如何在rabbitMQ中做到这一点.通过他们的文件.我刚刚发现了单队列多消费者模型.
我目前正在使用来自RabbitMQClient.dll C#客户端的EventingBasicConsumer,我们生成一个不同的线程来处理传递给消费者的每条消息.
我们遇到了一个奇怪的行为,RabbitMQ服务器有时会因错误而关闭连接missed heartbeats from client, timeout: 60s.几分钟后,客户报告错误说Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541.我也client unexpectedly closed TCP connection经常看到这个错误发生.
在某些情况下,客户端可能需要60秒以上的时间来处理一个作业请求,并且在这种情况下会发生此错误.
是否需要在60秒内处理作业?因为对于我们的流程,这可能在30秒到5分钟之间变化.
RabbitMQ服务器:3.6.6 RabbitMQ.Client.dll(C#客户端):RabbitMQ.Client.4.1.1
非常感谢对此问题的任何见解.
我是RabbitMQ的新手.我希望能够在有多个队列(要读取)的情况下处理读取消息而不会阻塞.有关如何做到这一点的任何意见?
//编辑1
public class Rabbit : IMessageBus
{
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
ConnectionFactory factory = null;
IConnection connection = null;
IModel channel = null;
Subscription sub = null;
public void writeMessage( Measurement m1 ) {
byte[] body = Measurement.AltSerialize( m1 );
int msgCount = 1;
Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id);
string finalQueue = publishToQueue( m1.id );
while (msgCount --> 0) {
channel.BasicPublish("amq.direct", finalQueue, null, body);
}
Console.WriteLine("Done. …Run Code Online (Sandbox Code Playgroud)