RabbitMQ和渠道Java线程安全

Sim*_*ano 14 java multithreading rabbitmq

在本指南中https://www.rabbitmq.com/api-guide.html RabbitMQ人员说:

通道和并发注意事项(线程安全)

不得在线程之间共享通道实例.应用程序应该更喜欢每个线程使用一个Channel,而不是跨多个线程共享相同的Channel.虽然通道上的某些操作可以安全地同时调用,但有些操作并不会导致错误的帧交错.在线程之间共享通道也会干扰*Publisher Confirms.

线程安全非常重要,所以我尽量努力,但问题是:

我有这个从Rabbit接收消息的应用程序.收到消息后,它会对其进行处理,然后在完成时确认.应用程序可以在具有2个线程的固定线程池中同时处理2个项目.Rabbit的QOS预取设置为2,因为我不想为应用程序提供超过它在时间范围内可以处理的内容.

现在,我的消费者的handleDelivery执行以下操作:

Task run = new Task(JSON.parse(message));    
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));
Run Code Online (Sandbox Code Playgroud)

此时,您已经发现TestWrapperThread将该channel.basicAck(deliveryTag, false);调用作为最后一个操作.

通过我对文档的理解,这是不正确的并且可能有害,因为通道不是线程安全的,并且这种行为可能会搞砸了.但那我该怎么做呢?我的意思是,我有一些想法,但他们会把一切都变得更复杂,我想知道它是否真的有必要.

提前致谢

Gab*_*ele 11

我想你Channel只是为你的消费者使用而不是像发布等其他操作.

在您的情况下,唯一的潜在问题是:

channel.basicAck(deliveryTag, false);
Run Code Online (Sandbox Code Playgroud)

因为你跨两个线程调用这个,顺便说一下这个操作是安全的,如果你看到java代码:

班级ChannelN.java电话:

public void basicAck(long deliveryTag, boolean multiple)
   throws IOException
{
   transmit(new Basic.Ack(deliveryTag, multiple));
}
Run Code Online (Sandbox Code Playgroud)

请参阅ChannelN.java的github代码

transmitAMQChannel内部的方法使用:

public void transmit(Method m) throws IOException {
   synchronized (_channelMutex) {
       transmit(new AMQCommand(m));
   }
}
Run Code Online (Sandbox Code Playgroud)

_channelMutex 是一个 protected final Object _channelMutex = new Object();

与班级一起创建. 请参阅AMQChannel.java的github代码

编辑

正如你可以在官方文档中看到的那样,"一些"操作是线程安全的,现在还不清楚哪些操作.我研究了代码,我认为在更多线程上调用ACK没有问题.

希望能帮助到你.

EDIT2 我还添加了Nicolas的评论:

请注意,从多个线程中使用(basicConsume)和acking是java客户端已经使用的常见rabbitmq模式.

所以你可以安全地使用它.

  • 谢谢大家的帮助.在这一点上,我所要做的就是试图找出RabbitMQ上下文中实际上是什么帧交错.我发现了它.每次发布非空消息时,您的客户端都会在线路上发送3个(或更多)帧:`[method:basic.publish] [内容标题] [内容正文] +`当从多个线程在同一个频道上发布时,你可能最终得到错误的交错,例如`[method:basic.publish] [内容标题] [方法:basic.publish] [内容正文] [内容标题] [内容正文]`全部清除,谢谢! (6认同)