Art*_*nko 5 java multithreading rabbitmq
我想获取几条消息,处理它们并在此之后将它们全部确认。所以基本上我收到一条消息,把它放在某个队列中并继续接收来自rabbit 的消息。不同的线程将使用接收到的消息监视此队列,并在数量足够时处理它们。我所能找到的关于 ack 的所有示例仅包含在同一线程上处理的一条消息的示例。像这样(来自官方文档):
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
Run Code Online (Sandbox Code Playgroud)
还有文档说:
通道实例不能在线程之间共享。应用程序应该更喜欢每个线程使用一个通道,而不是跨多个线程共享同一个通道。虽然通道上的某些操作可以安全地并发调用,但有些操作不是,并且会导致在线上不正确的帧交错。
所以我在这里很困惑。如果我正在确认一些消息,同时通道正在接收来自rabbit 的另一条消息,它是否被认为是当时的两个操作?在我看来,是的。
我试图确认来自不同线程的同一通道上的消息,它似乎有效,但文档说我不应该在线程之间共享通道。所以我试图用不同的频道在不同的线程上做确认,但它失败了,因为这个频道的交付标签是未知的。
是否可以确认不在收到消息的同一线程上的消息?
UPD 我想要的代码示例。它在 Scala 中,但我认为它很简单。
case class AmqpMessage(envelope: Envelope, msgBody: String)
val queue = new ArrayBlockingQueue[AmqpMessage](100)
val consumeChannel = connection.createChannel()
consumeChannel.queueDeclare(queueName, true, false, true, null)
consumeChannel.basicConsume(queueName, false, new DefaultConsumer(consumeChannel) {
override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: BasicProperties,
body: Array[Byte]): Unit = {
queue.put(new AmqpMessage(envelope, new String(body)))
}
})
Future {
// this is different thread
val channel = connection.createChannel()
while (true) {
try {
val amqpMessage = queue.take()
channel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // doesn't work
consumeChannel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // works, but seems like not thread safe
} catch {
case e: Exception => e.printStackTrace()
}
}
}
Run Code Online (Sandbox Code Playgroud)
尽管文档相当严格,但通道上的某些操作可以安全地并发调用。只要消费和确认是您在通道上执行的唯一操作,您就可以在不同的线程中确认消息。
看看这个SO问题,它处理同样的事情: