RabbitMQ Java客户端 - 如何合理地处理异常和关闭?

bco*_*lan 10 java amqp rabbitmq

这是我目前所知的(请纠正我):

在RabbitMQ Java客户端中,IOException当存在一般网络故障(来自代理的格式错误的数据,身份验证失败,错过的心跳)时,通道上的操作将被抛出.

通道上的操作也可以抛出ShutdownSignalException未经检查的异常,通常是AlreadyClosedException在我们尝试在通道/连接关闭后对其执行操作时.

关闭过程在"网络故障,内部故障或显式本地关闭"(例如通过channel.close()或connection.close())的情况下发生.关闭事件沿着"拓扑"传播,从Connection - > Channel - > Consumer,以及当它调用Consumer的handleShutdown()方法被调用的Channel时.

用户还可以添加在关闭过程完成后调用的关闭侦听器.

这是我缺少的:

  1. 由于IOException指示网络故障,它是否也会启动关闭请求?
  2. 使用自动恢复模式如何影响关闭请求?是否会导致通道操作在尝试重新连接到通道时阻塞,或者是否仍会抛出ShutdownSignalException?

这是我现在处理异常的方式,这是一种明智的方法吗?

我的设置是我正在轮询QueueingConsumer并将任务分派给工作池.rabbitmq客户端封装在MyRabbitMQWrapper这里.当轮询队列时发生异常时,我只是优雅地关闭所有内容并重新启动客户端.当工作者发生异常时,我也只是记录它并完成工作.

我最大的担忧(与问题1相关):假设在工作者中发生IOException,则任务不会被激活.如果关闭没有发生,我现在有一个未完成的任务将永远处于不确定状态.

伪代码:

class Main {
    public static void main(String[] args) {
        while(true) {
            run();
            //Easy way to restart the client, the connection has been
            //closed so RabbitMQ will re-queue any un-acked tasks.
            log.info("Shutdown occurred, restarting in 5 seconds");
            Thread.sleep(5000);
        }
    }

    public void run() {
      MyRabbitMQWrapper rw = new MyRabbitMQWrapper("localhost");

      try {
        rw.connect();

        while(!Thread.currentThread().isInterrupted()) {
           try {
               //Wait for a message on the QueueingConsumer
               MyMessage t = rw.getNextMessage();
               workerPool.submit(new MyTaskRunnable(rw, t));
           } catch (InterruptedException | IOException | ShutdownSignalException e) {
               //Handle all AMQP library exceptions by cleaning up and returning
               log.warn("Shutting down", e);
               workerPool.shutdown();
               break;
           }
        }
      } catch (IOException e) {
        log.error("Could not connect to broker", e);
      } finally {
        try { 
            rw.close(); 
        } catch(IOException e) { 
            log.info("Could not close connection");
        }
      }
    }
}

class MyTaskRunnable implements Runnable {
    ....

    public void run() {
        doStuff();
        try {
            rw.ack(...);
        } catch (IOException | ShutdownSignalException e) {
            log.warn("Could not ack task");
        }
    }
}
Run Code Online (Sandbox Code Playgroud)