我有一个Spring启动应用程序,使用spring-boot版本1.5.16.RELEASE,它将以下依赖项添加到我的应用程序:
[INFO] | +- com.rabbitmq:http-client:jar:1.0.0.RELEASE:compile
[INFO] | +- org.springframework.amqp:spring-rabbit:jar:1.7.10.RELEASE:compile
[INFO] | | +- org.springframework.amqp:spring-amqp:jar:1.7.10.RELEASE:compile
[INFO] | | \- com.rabbitmq:amqp-client:jar:4.8.1:compile
[INFO] +- org.springframework.boot:spring-boot-starter-amqp:jar:1.5.16.RELEASE:compile
Run Code Online (Sandbox Code Playgroud)
该应用程序还使用rabbitmq服务器3.6.6作为消息代理.
我很少注意到我在应用程序中收到以下错误:
org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:67)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1208)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1197)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:562)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:545)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:515)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:497)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1300(CachingConnectionFactory.java:102)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1213)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1443)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1419)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:713)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:707)
...
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:899)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:790)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1381)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1294)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1550)
at …Run Code Online (Sandbox Code Playgroud) 在我的项目中,我使用 Springboot 版本 2.1.2.RELEASE,reactor-rabbitmq 版本 1.0.0.RELEASE。我正在创建兔子接收器,订阅它并使用手动确认处理消息。但过了一段时间后,可能是工作一小时或 1-2 天后,我收到心跳丢失错误,然后通道正在关闭,我收到“com.rabbitmq.client.AlreadyClosedException:由于连接错误,连接已关闭” ;原因:java.io.IOException:连接被对等方重置”并且我的接收器不再接收消息。仅在重新启动后才有效。
Rabbit客户端有connectionFactory.setAutomaticRecoveryEnabled(true); 和connectionFactory.setTopologyRecoveryEnabled(true);
所以默认情况下它应该自动恢复,但它不起作用。
public void startReceiver(int parallelism) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setRequestedHeartbeat(10);
Address[] addresses = {new Address("localhost")};
ReceiverOptions receiverOptions = new ReceiverOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection(addresses, "receiver"))
.connectionSubscriptionScheduler(Schedulers.elastic());
Receiver receiver = RabbitFlux.createReceiver(receiverOptions);
receiver.consumeManualAck("test-data", new ConsumeOptions().qos(200))
.doOnSubscribe(s -> System.out.println("Receiver started."))
.retry()
.parallel(parallelism)
.runOn(Schedulers.newParallel("parallel-receiver", parallelism))
.doOnNext(d -> processMessage(d))
.subscribe();
}
private void processMessage(AcknowledgableDelivery message) {
try {
//some processing
} catch (Exception e) {
e.printStackTrace();
} …Run Code Online (Sandbox Code Playgroud)