使用DefaultConsumer与QueueingConsumer的RabbitMQ Java客户端

Bic*_*ick 10 messaging multithreading rabbitmq

  1. DefaultConsumer
    我的DemoConsumer继承自DefaultConsumer.
    我注意到以这种方式工作handleDelivery()是从ThreadPool调用的.
    (打印Thread.currentThread().getName()我每次都看到pool-1-thread-1/2/3/4.
    我也测试了几次,看到订单已保存.
    只是为了确保 - 因为不同线程调用句柄交付 - 它会弄乱我的订单吗?

  2. QueueingConsumer
    所有的java教程都使用QueueingConsumer来使用消息.
    在API Docs中,它被称为已弃用的类.
    我应该将我的代码更改为继承自DefaultConsumer吗?教程是否过时了?

谢谢.

Gab*_*ele 14

是的,DefaultConsumer使用可以更改的内部线程池.使用ExecutorService如:

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
Run Code Online (Sandbox Code Playgroud)

阅读http://www.rabbitmq.com/api-guide.html "高级连接选项".

正如您可以从"QueueingConsumer" 文档中读到:

因此,现在可以直接实现Consumer或扩展DefaultConsumer.

我从未使用过QueueingConsumer,因为它没有正确的事件驱动.

正如你在这里看到的:

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    /// here you are blocked, waiting the next message.
    String message = new String(delivery.getBody());
}
Run Code Online (Sandbox Code Playgroud)

这种情况下的典型问题是如何关闭订阅,常见的解决方法是在本地主机中发送带标记的关闭消息.其实我不太喜欢它.

如果您扩展DefaultConsumer,则可以正确关闭订阅和频道:

public class MyConsumer extends DefaultConsumer {...}
Run Code Online (Sandbox Code Playgroud)

然后

public static void main(String[] args) {
MyConsumer consumer = new MyConsumer (channel);
String consumerTag = channel.basicConsume(Constants.queue, false, consumer);
System.out.println("press any key to terminate");
System.in.read();
channel.basicCancel(consumerTag);
channel.close();
....
Run Code Online (Sandbox Code Playgroud)

总之,你不应该担心消息顺序,因为如果一切正常,消息顺序是正确的,但我认为你不能假设它,因为如果有一些问题,你可能会丢失消息顺序.如果您绝对需要维护消息顺序,则应包括顺序标记以在消费者端重建消息顺序.

你应该扩展DefaultConsumer.