小编Pet*_*vak的帖子

反应器rabbitmq AlreadyClosedException

在我的项目中,我使用 Springboot 版本 2.1.2.RELEASE,reactor-rabbitmq 版本 1.0.0.RELEASE。我正在创建兔子接收器,订阅它并使用手动确认处理消息。但过了一段时间后,可能是工作一小时或 1-2 天后,我收到心跳丢失错误,然后通道正在关闭,我收到“com.rabbitmq.client.AlreadyClosedException:由于连接错误,连接已关闭” ;原因:java.io.IOException:连接被对等方重置”并且我的接收器不再接收消息。仅在重新启动后才有效。

Rabbit客户端有connectionFactory.setAutomaticRecoveryEnabled(true); 和connectionFactory.setTopologyRecoveryEnabled(true);

所以默认情况下它应该自动恢复,但它不起作用。

public void startReceiver(int parallelism) {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.useNio();
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");
    connectionFactory.setRequestedHeartbeat(10);

    Address[] addresses = {new Address("localhost")};
    ReceiverOptions receiverOptions = new ReceiverOptions()
            .connectionFactory(connectionFactory)
            .connectionSupplier(cf -> cf.newConnection(addresses, "receiver"))
            .connectionSubscriptionScheduler(Schedulers.elastic());

    Receiver receiver = RabbitFlux.createReceiver(receiverOptions);
    receiver.consumeManualAck("test-data", new ConsumeOptions().qos(200))
    .doOnSubscribe(s -> System.out.println("Receiver started."))
    .retry()
    .parallel(parallelism)
    .runOn(Schedulers.newParallel("parallel-receiver", parallelism))
    .doOnNext(d -> processMessage(d))
    .subscribe();
}

private void processMessage(AcknowledgableDelivery message) {
    try {
        //some processing
    } catch (Exception e) {
        e.printStackTrace();
    } …
Run Code Online (Sandbox Code Playgroud)

java rabbitmq spring-boot amqp-client reactor-rabbitmq

5
推荐指数
0
解决办法
907
查看次数