Ami*_*imi 13 spring spring-amqp
我正在测试Spring AMQP v1.4.2中的以下场景,并且在网络中断后无法重新连接:
sudo iptables -A INPUT -p tcp --destination-port 5672 -j DROPsudo iptables -D INPUT -p tcp --destination-port 5672 -j DROP我还测试了与VM网络适配器断开而不是iptables drop相同的情况,同样的事情发生,即没有自动重新连接.有趣的是,当我尝试iptables REJECT而不是DROP时,它按预期工作,一旦我删除拒绝规则,应用程序就会重新启动,但我认为拒绝更像是服务器故障而不是网络故障.
根据参考文件:
如果MessageListener因业务异常而失败,则异常由消息侦听器容器处理,然后返回侦听另一条消息.如果失败是由连接断开(不是业务异常)引起的,则必须取消并重新启动正在为侦听器收集消息的使用者.SimpleMessageListenerContainer无缝地处理它,并留下一个日志来说明正在重新启动监听器.事实上,它无休止地循环尝试重新启动消费者,并且只有当消费者表现得非常糟糕时才会放弃.一个副作用是,如果代理在容器启动时关闭,它将继续尝试直到可以建立连接.
这是我断开连接一分钟后得到的日志:
2015-01-16 14:00:42,433 WARN [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
... 1 common frames omitted
Run Code Online (Sandbox Code Playgroud)
我在重新连接后几秒钟收到此日志消息:
2015-01-16 14:18:14,551 WARN [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out
Run Code Online (Sandbox Code Playgroud)
更新:非常奇怪的是,当我在org.springframework.amqp包上启用DEBUG登录时,重新连接成功发生,我再也无法重现这个问题了!
在没有启用调试日志记录的情况下,我尝试调试spring AMQP代码.我发现删除iptables drop后不久,SimpleMessageListenerContainer.doStop()调用方法调用shutdown()并取消所有通道.当我在doStop()上放置断点时,我也得到了这条日志消息,这似乎与原因有关:
2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer
Run Code Online (Sandbox Code Playgroud)
更新2:设置requested-heartbeat为30秒后,如答案所示,重新连接大部分时间都有效,并成功重新定义了绑定到扇出交换的独占临时队列,但它仍然无法偶尔重新连接.
在失败的极少数情况下,我在测试期间监视RabbitMQ管理控制台并观察到建立了新连接(在超时删除旧连接之后)但重新连接后未重新定义独占临时队列.客户端也没有收到任何消息.现在很难可靠地重现这个问题,因为它不常发生.我已经提供了下面的完整配置,现在包含队列声明.
更新3:即使用自动删除命名队列替换独占临时队列,偶尔也会出现相同的行为; 即重新连接后未重新定义自动删除命名队列,并且在重新启动应用程序之前不会收到任何消息.
如果有人可以帮助我,我会非常感激.
这是我依赖的Spring AMQP配置:
<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>
<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
<rabbit:bindings>
<rabbit:binding queue="control-queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
acknowledge="none"
concurrency="1"
prefetch="1">
<rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>
</rabbit:listener-container>
<rabbit:connection-factory id="connection-factory"
username="${rabbit.username}"
password="${rabbit.password}"
host="${rabbit.host}"
virtual-host="${rabbit.virtualhost}"
publisher-confirms="true"
channel-cache-size="100"
requested-heartbeat="30" />
<rabbit:admin id="admin" connection-factory="connection-factory"/>
<rabbit:queue id="qu0-id" name="qu0">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="dead-letter"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
<rabbit:bindings>
<rabbit:binding queue="qu0" pattern="p.0"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:listener-container connection-factory="connection-factory"
acknowledge="manual"
concurrency="4"
prefetch="30">
<rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>
Run Code Online (Sandbox Code Playgroud)
我只是按照描述进行了测试(Linux上的兔子iptables用于丢弃数据包)。
重新建立连接时没有日志(也许我们应该这样做)。
我建议您打开调试日志记录以查看重新连接。
编辑:
从rabbitmq文档中:
独占队列只能由当前连接访问,并且在该连接关闭时将被删除。不允许通过其他连接被动声明互斥队列。
根据您的例外:
回复代码= 405,回复文本=资源锁定-无法获得对虚拟主机'/'中锁定队列'e4288669-2422-40e6-a2ee-b99542509273'的排他访问,类ID = 50,方法-
因此,问题在于经纪人仍然认为其他连接存在。
requestedHeartbeat以便代理可以更快地检测到丢失的连接。| 归档时间: |
|
| 查看次数: |
11636 次 |
| 最近记录: |