Sim*_*ano 14 java multithreading rabbitmq
在本指南中https://www.rabbitmq.com/api-guide.html RabbitMQ人员说:
通道和并发注意事项(线程安全)
不得在线程之间共享通道实例.应用程序应该更喜欢每个线程使用一个Channel,而不是跨多个线程共享相同的Channel.虽然通道上的某些操作可以安全地同时调用,但有些操作并不会导致错误的帧交错.在线程之间共享通道也会干扰*Publisher Confirms.
线程安全非常重要,所以我尽量努力,但问题是:
我有这个从Rabbit接收消息的应用程序.收到消息后,它会对其进行处理,然后在完成时确认.应用程序可以在具有2个线程的固定线程池中同时处理2个项目.Rabbit的QOS预取设置为2,因为我不想为应用程序提供超过它在时间范围内可以处理的内容.
现在,我的消费者的handleDelivery执行以下操作:
Task run = new Task(JSON.parse(message));
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));
Run Code Online (Sandbox Code Playgroud)
此时,您已经发现TestWrapperThread将该channel.basicAck(deliveryTag, false);
调用作为最后一个操作.
通过我对文档的理解,这是不正确的并且可能有害,因为通道不是线程安全的,并且这种行为可能会搞砸了.但那我该怎么做呢?我的意思是,我有一些想法,但他们会把一切都变得更复杂,我想知道它是否真的有必要.
提前致谢
Gab*_*ele 11
我想你Channel
只是为你的消费者使用而不是像发布等其他操作.
在您的情况下,唯一的潜在问题是:
channel.basicAck(deliveryTag, false);
Run Code Online (Sandbox Code Playgroud)
因为你跨两个线程调用这个,顺便说一下这个操作是安全的,如果你看到java代码:
班级ChannelN.java
电话:
public void basicAck(long deliveryTag, boolean multiple)
throws IOException
{
transmit(new Basic.Ack(deliveryTag, multiple));
}
Run Code Online (Sandbox Code Playgroud)
transmit
AMQChannel内部的方法使用:
public void transmit(Method m) throws IOException {
synchronized (_channelMutex) {
transmit(new AMQCommand(m));
}
}
Run Code Online (Sandbox Code Playgroud)
_channelMutex
是一个 protected final Object _channelMutex = new Object();
与班级一起创建. 请参阅AMQChannel.java的github代码
编辑
正如你可以在官方文档中看到的那样,"一些"操作是线程安全的,现在还不清楚哪些操作.我研究了代码,我认为在更多线程上调用ACK没有问题.
希望能帮助到你.
EDIT2 我还添加了Nicolas的评论:
请注意,从多个线程中使用(basicConsume)和acking是java客户端已经使用的常见rabbitmq模式.
所以你可以安全地使用它.
归档时间: |
|
查看次数: |
3924 次 |
最近记录: |