兔子MQ。Java客户端。是否可以确认不在收到消息的同一线程上的消息?

Art*_*nko 5 java multithreading rabbitmq

我想获取几条消息,处理它们并在此之后将它们全部确认。所以基本上我收到一条消息,把它放在某个队列中并继续接收来自rabbit 的消息。不同的线程将使用接收到的消息监视此队列,并在数量足够时处理它们。我所能找到的关于 ack 的所有示例仅包含在同一线程上处理的一条消息的示例。像这样(来自官方文档):

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
Run Code Online (Sandbox Code Playgroud)

还有文档说:

通道实例不能在线程之间共享。应用程序应该更喜欢每个线程使用一个通道,而不是跨多个线程共享同一个通道。虽然通道上的某些操作可以安全地并发调用,但有些操作不是,并且会导致在线上不正确的帧交错。

所以我在这里很困惑。如果我正在确认一些消息,同时通道正在接收来自rabbit 的另一条消息,它是否被认为是当时的两个操作?在我看来,是的。

我试图确认来自不同线程的同一通道上的消息,它似乎有效,但文档说我不应该在线程之间共享通道。所以我试图用不同的频道在不同的线程上做确认,但它失败了,因为这个频道的交付标签是未知的。

是否可以确认不在收到消息的同一线程上的消息?

UPD 我想要的代码示例。它在 Scala 中,但我认为它很简单。

 case class AmqpMessage(envelope: Envelope, msgBody: String)

    val queue = new ArrayBlockingQueue[AmqpMessage](100)

    val consumeChannel = connection.createChannel()
    consumeChannel.queueDeclare(queueName, true, false, true, null)
    consumeChannel.basicConsume(queueName, false, new DefaultConsumer(consumeChannel) {
      override def handleDelivery(consumerTag: String,
                                  envelope: Envelope,
                                  properties: BasicProperties,
                                  body: Array[Byte]): Unit = {
        queue.put(new AmqpMessage(envelope, new String(body)))
      }
    })

    Future {
      // this is different thread
      val channel = connection.createChannel()
      while (true) {
        try {
          val amqpMessage = queue.take()
          channel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // doesn't work
          consumeChannel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // works, but seems like not thread safe
        } catch {
          case e: Exception => e.printStackTrace()
        }
      }
    }
Run Code Online (Sandbox Code Playgroud)

Dav*_*iro 1

尽管文档相当严格,但通道上的某些操作可以安全地并发调用。只要消费确认是您在通道上执行的唯一操作,您就可以在不同的线程中确认消息。

看看这个SO问题,它处理同样的事情:

RabbitMQ 和通道 Java 线程安全