bco*_*lan 10 java amqp rabbitmq
这是我目前所知的(请纠正我):
在RabbitMQ Java客户端中,IOException
当存在一般网络故障(来自代理的格式错误的数据,身份验证失败,错过的心跳)时,通道上的操作将被抛出.
通道上的操作也可以抛出ShutdownSignalException
未经检查的异常,通常是AlreadyClosedException
在我们尝试在通道/连接关闭后对其执行操作时.
关闭过程在"网络故障,内部故障或显式本地关闭"(例如通过channel.close()或connection.close())的情况下发生.关闭事件沿着"拓扑"传播,从Connection - > Channel - > Consumer,以及当它调用Consumer的handleShutdown()
方法被调用的Channel时.
用户还可以添加在关闭过程完成后调用的关闭侦听器.
这是我缺少的:
这是我现在处理异常的方式,这是一种明智的方法吗?
我的设置是我正在轮询QueueingConsumer并将任务分派给工作池.rabbitmq客户端封装在MyRabbitMQWrapper
这里.当轮询队列时发生异常时,我只是优雅地关闭所有内容并重新启动客户端.当工作者发生异常时,我也只是记录它并完成工作.
我最大的担忧(与问题1相关):假设在工作者中发生IOException,则任务不会被激活.如果关闭没有发生,我现在有一个未完成的任务将永远处于不确定状态.
伪代码:
class Main {
public static void main(String[] args) {
while(true) {
run();
//Easy way to restart the client, the connection has been
//closed so RabbitMQ will re-queue any un-acked tasks.
log.info("Shutdown occurred, restarting in 5 seconds");
Thread.sleep(5000);
}
}
public void run() {
MyRabbitMQWrapper rw = new MyRabbitMQWrapper("localhost");
try {
rw.connect();
while(!Thread.currentThread().isInterrupted()) {
try {
//Wait for a message on the QueueingConsumer
MyMessage t = rw.getNextMessage();
workerPool.submit(new MyTaskRunnable(rw, t));
} catch (InterruptedException | IOException | ShutdownSignalException e) {
//Handle all AMQP library exceptions by cleaning up and returning
log.warn("Shutting down", e);
workerPool.shutdown();
break;
}
}
} catch (IOException e) {
log.error("Could not connect to broker", e);
} finally {
try {
rw.close();
} catch(IOException e) {
log.info("Could not close connection");
}
}
}
}
class MyTaskRunnable implements Runnable {
....
public void run() {
doStuff();
try {
rw.ack(...);
} catch (IOException | ShutdownSignalException e) {
log.warn("Could not ack task");
}
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
5337 次 |
最近记录: |