我的情况是rabbitmq服务器空间不足,如下所示
Filesystem 1K-blocks Used Available Use% Mounted on
/dev/mapper/ramonubuntu--vg-root 6299376 5956336 0 100% /
Run Code Online (Sandbox Code Playgroud)
生产者向服务器发布消息(消息需要持久化),然后将永久阻止,它将等待发布的响应.当然我们应该避免服务器空间不足的情况,但有没有任何超时机制让生产者退出等待?
我尝试过心跳和SO_TIMEOUT,它们都不起作用,因为网络工作正常.以下是我的制片人.
protected void publish(byte[] message) throws Exception {
// ConnectionFactory can be reused between threads.
ConnectionFactory factory = new SoTimeoutConnectionFactory();
factory.setHost(this.getHost());
factory.setVirtualHost("te");
factory.setPort(5672);
factory.setUsername("amqp");
factory.setPassword("amqp");
factory.setConnectionTimeout(10 * 1000);
// doesn't help if server got out of space
factory.setRequestedHeartbeat(1);
final Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// declare a 'topic' type of exchange
channel.exchangeDeclare(this.exchangeName, "topic", true);
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
logger.warn("[X]Returned message(replyCode:" + replyCode + ",replyText:" + replyText
+ ",exchange:" + exchange + ",routingKey:" + routingKey + ",body:" + new String(body));
}
});
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
logger.info("Ack: " + deliveryTag);
// RabbitMessagePublishMain.this.release(connection);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
logger.info("Nack: " + deliveryTag);
// RabbitMessagePublishMain.this.release(connection);
}
});
channel.basicPublish(this.exchangeName, RabbitMessageConsumerMain.EXCHANGE_NAME + ".-1", true,
MessageProperties.PERSISTENT_BASIC, message);
channel.waitForConfirmsOrDie(10*1000);
// now we can close connection
connection.close();
}
Run Code Online (Sandbox Code Playgroud)
它将阻止'channel.waitForConfirmsOrDie(10*1000);'和SotimeoutConnectionFactory,
public class SoTimeoutConnectionFactory extends ConnectionFactory {
@Override
protected void configureSocket(Socket socket) throws IOException {
super.configureSocket(socket);
socket.setSoTimeout(10 * 1000);
}
}
Run Code Online (Sandbox Code Playgroud)
我还捕获了制作人和rabbimq之间的网络,

请帮忙.