在我的项目中,我使用 Springboot 版本 2.1.2.RELEASE,reactor-rabbitmq 版本 1.0.0.RELEASE。我正在创建兔子接收器,订阅它并使用手动确认处理消息。但过了一段时间后,可能是工作一小时或 1-2 天后,我收到心跳丢失错误,然后通道正在关闭,我收到“com.rabbitmq.client.AlreadyClosedException:由于连接错误,连接已关闭” ;原因:java.io.IOException:连接被对等方重置”并且我的接收器不再接收消息。仅在重新启动后才有效。
Rabbit客户端有connectionFactory.setAutomaticRecoveryEnabled(true); 和connectionFactory.setTopologyRecoveryEnabled(true);
所以默认情况下它应该自动恢复,但它不起作用。
public void startReceiver(int parallelism) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setRequestedHeartbeat(10);
Address[] addresses = {new Address("localhost")};
ReceiverOptions receiverOptions = new ReceiverOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection(addresses, "receiver"))
.connectionSubscriptionScheduler(Schedulers.elastic());
Receiver receiver = RabbitFlux.createReceiver(receiverOptions);
receiver.consumeManualAck("test-data", new ConsumeOptions().qos(200))
.doOnSubscribe(s -> System.out.println("Receiver started."))
.retry()
.parallel(parallelism)
.runOn(Schedulers.newParallel("parallel-receiver", parallelism))
.doOnNext(d -> processMessage(d))
.subscribe();
}
private void processMessage(AcknowledgableDelivery message) {
try {
//some processing
} catch (Exception e) {
e.printStackTrace();
} …Run Code Online (Sandbox Code Playgroud)