Ale*_*exC 1 message-queue rabbitmq java-8
每当我启动以下代码时:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "direct_logs";
channel.exchangeDeclare(exchangeName, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "red");
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException{
String message = new String(body, "UTF-8");
System.out.println(message);
System.out.println("message received");
}
};
channel.basicConsume(queueName, true, consumer);
Run Code Online (Sandbox Code Playgroud)
它不会像文档中所暗示的那样开始无限循环。相反,它会立即停止。我可以消耗一段时间的唯一方法是channel.basicConsume用循环替换,如下所示:
DateTime startedAt = new DateTime();
DateTime stopAt = startedAt.plusSeconds(60);
long i=0;
try {
while (stopAt.compareTo(new DateTime()) > 0) {
channel.basicConsume(queueName, true, consumer);
i++;
}
}finally {
System.out.println(new DateTime());
System.out.println(startedAt);
System.out.println(stopAt);
System.out.println(i);
}
Run Code Online (Sandbox Code Playgroud)
必须有更好的方法来收听消息一段时间,对吗?我想念什么?它立即停止收听。
您确定要停止吗?要做的basicConsume是注册使用者以侦听特定队列,因此无需循环执行它。您只需执行一次,并且传递消息handleDelivery的实例Consumer将在消息到达时被调用。
Rabbitmq库创建的线程应阻止JVM退出。为了退出程序,您应该实际调用connection.close()
这是rabbitmq的完整接收器示例:https : //github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java
实际上和您的差不多。