我的channel.basicConsume为何不等待消息

Ale*_*exC 1 message-queue rabbitmq java-8

每当我启动以下代码时:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String exchangeName = "direct_logs";
    channel.exchangeDeclare(exchangeName, "direct");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, exchangeName, "red");
    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException{
            String message = new String(body, "UTF-8");
            System.out.println(message);
            System.out.println("message received");
        }
    };

    channel.basicConsume(queueName, true, consumer);
Run Code Online (Sandbox Code Playgroud)

它不会像文档中所暗示的那样开始无限循环。相反,它会立即停止。我可以消耗一段时间的唯一方法是channel.basicConsume用循环替换,如下所示:

    DateTime startedAt = new DateTime();
    DateTime stopAt = startedAt.plusSeconds(60);
    long i=0;
    try {
        while (stopAt.compareTo(new DateTime()) > 0) {
            channel.basicConsume(queueName, true, consumer);
            i++;
        }
    }finally {
        System.out.println(new DateTime());
        System.out.println(startedAt);
        System.out.println(stopAt);
        System.out.println(i);
    }
Run Code Online (Sandbox Code Playgroud)

必须有更好的方法来收听消息一段时间,对吗?我想念什么?它立即停止收听。

Naz*_* K. 5

您确定要停止吗?要做的basicConsume是注册使用者以侦听特定队列,因此无需循环执行它。您只需执行一次,并且传递消息handleDelivery的实例Consumer将在消息到达时被调用。

Rabbitmq库创建的线程应阻止JVM退出。为了退出程序,您应该实际调用connection.close()

这是rabbitmq的完整接收器示例:https : //github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java

实际上和您的差不多。