服务器超出范围时basic-publish的超时

Ram*_*mon 3 rabbitmq

我的情况是rabbitmq服务器空间不足,如下所示

Filesystem                       1K-blocks    Used Available Use% Mounted on
/dev/mapper/ramonubuntu--vg-root   6299376 5956336         0 100% /
Run Code Online (Sandbox Code Playgroud)

生产者向服务器发布消息(消息需要持久化),然后将永久阻止,它将等待发布的响应.当然我们应该避免服务器空间不足的情况,但有没有任何超时机制让生产者退出等待?

我尝试过心跳和SO_TIMEOUT,它们都不起作用,因为网络工作正常.以下是我的制片人.

 protected void publish(byte[] message) throws Exception {
    // ConnectionFactory can be reused between threads.
    ConnectionFactory factory = new SoTimeoutConnectionFactory();
    factory.setHost(this.getHost());
    factory.setVirtualHost("te");
    factory.setPort(5672);
    factory.setUsername("amqp");
    factory.setPassword("amqp");
    factory.setConnectionTimeout(10 * 1000);
    // doesn't help if server got out of space
    factory.setRequestedHeartbeat(1);
    final Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // declare a 'topic' type of exchange
    channel.exchangeDeclare(this.exchangeName, "topic", true);

    channel.addReturnListener(new ReturnListener() {

        @Override
        public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
            logger.warn("[X]Returned message(replyCode:" + replyCode + ",replyText:" + replyText
                    + ",exchange:" + exchange + ",routingKey:" + routingKey + ",body:" + new String(body));
        }

    });

    channel.confirmSelect();
    channel.addConfirmListener(new ConfirmListener() {

        @Override
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            logger.info("Ack: " + deliveryTag);
            // RabbitMessagePublishMain.this.release(connection);
        }

        @Override
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            logger.info("Nack: " + deliveryTag);
            // RabbitMessagePublishMain.this.release(connection);
        }

    });

    channel.basicPublish(this.exchangeName, RabbitMessageConsumerMain.EXCHANGE_NAME + ".-1", true,
            MessageProperties.PERSISTENT_BASIC, message);
    channel.waitForConfirmsOrDie(10*1000);
    // now we can close connection
    connection.close();
}
Run Code Online (Sandbox Code Playgroud)

它将阻止'channel.waitForConfirmsOrDie(10*1000);'和SotimeoutConnectionFactory,

public class SoTimeoutConnectionFactory extends ConnectionFactory {

    @Override
    protected void configureSocket(Socket socket) throws IOException {
        super.configureSocket(socket);
        socket.setSoTimeout(10 * 1000);
    }
}
Run Code Online (Sandbox Code Playgroud)

我还捕获了制作人和rabbimq之间的网络, 在此输入图像描述

请帮忙.

ean*_*son 6

您需要实现连接阻止/未阻止.

这基本上是一种通知发布者服务器资源不足的方法.这样做的好处是,一旦可以安全地再次发布,也会通知发布者.

我建议你看一下这篇文章.实现此目的的一种简单方法是使用一个标志来指示发布是否安全,如果它没有等到它.

作为一个例子,您可以看看我是如何在我的一个Python示例中实现它的.