RabbitMQ:快速生产者和缓慢的消费者

ton*_*nga 16 java multithreading producer-consumer amqp rabbitmq

我有一个应用程序使用RabbitMQ作为消息队列在两个组件之间发送/接收消息:发送方和接收方.发件人以非常快的方式发送消息.接收器接收消息然后执行一些非常耗时的任务(主要是针对非常大的数据大小的数据库写入).由于接收器需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填满队列.所以我的问题是:这会导致消息队列溢出吗?

消息使用者如下所示:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}
Run Code Online (Sandbox Code Playgroud)

消费者收到的每条消息都包含一个caseID.对于每个caseID,它会将大量数据保存到数据库中,这需要很长时间.目前只为RabbitMQ设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅caseID.那么如何才能加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消费率吗?或者我应该使用多个消费者同时使用传入消息?或者是否存在任何异步方式让消费者异步使用消息而不等待它完成?欢迎任何建议.

Pau*_*ney 14

"这会导致消息队列溢出吗?"

是.RabbitMQ将进入"流控制"状态,以防止在队列长度增加时过多的内存消耗.它还将开始将消息持久保存到磁盘,而不是将它们保存在内存中.

"那么我怎样才能加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出"

你有2个选择:

  1. 添加更多消费者.请记住,如果选择此选项,您的数据库现在将被多个并发进程操纵.确保DB能够承受额外的压力.
  2. 增加消费渠道的QOS值.这将从队列中提取更多消息并在消费者上缓冲它们.这将增加整体处理时间; 如果缓冲了5条消息,则第5条消息将占用消息1 ... 5的处理时间.

"我应该在消费者部分使用多线程来加快消费率吗?"

除非你有一个精心设计的解决方案.向应用程序添加并行性将在消费者方面增加大量开销.您最终可能会耗尽ThreadPool或限制内存使用量.

在处理AMQP时,您确实需要考虑每个流程的业务需求,以便设计最佳解决方案.您收到的消息的时间敏感程度如何?它们是否需要持久保存到DB AS,或者对您的用户是否重要,无论该数据是否立即可用?

如果不需要立即保留数据,则可以修改应用程序,以便消费者只需从队列中删除消息并将其保存到Redis中的高速缓存集合中.引入第二个进程,然后按顺序读取和处理缓存的消息.这将确保您的队列长度不会增长到足以导致流量控制,同时防止您的数据库被写入请求轰炸,而写入请求通常比读取请求更昂贵.您的消费者现在只是从队列中删除消息,稍后由另一个进程处理.


小智 1

“那么如何加快消费者吞吐量,让消费者能够赶上生产者,避免队列中的消息溢出呢?” 这就是答案“使用多个消费者同时消费传入的消息”,使用多线程并行运行这些消费者实现原理不共享任何内容,http://www.eaipatterns.com/CompetingConsumers.html