Rabbit mq 预取理解

use*_*718 9 amqp rabbitmq spring-rabbit spring-amqp

我理解如下

预取只是控制代理一次允许消费者处理多少条消息。当设置为 1 时,这意味着代理将发送 1 条消息,等待 ack,然后发送下一条。

但有关以下场景的问题:

  1. 假设预取为 200,我们有 2 个消费者空闲。经纪人收到了 150 条消息,我认为经纪人会随机选择一条消息并发送所有 150 条消息?我认为是的,它不会在消费者之间进行共享。

  2. 假设一个消费者有 100 条消息处于 unack 状态,其中一条处于空闲状态,再次预取为 200 条消息。现在我们又收到了 50 条消息,我认为代理会随机将这 50 条消息分配给任一消息?或者它不会向已经有 100 条消息尚未确认的消费者提供

  3. 如果预取是200,一个消费者得到200,监听器会阻塞该线程(springrabbitmq listner方法)发送ack直到所有200被处理吗?我认为它不会一一发送ack,而是会等到所有预取的消息都处理完毕。换句话说,如果预取为 200 并且代理发送 200 条消息,那么代理何时开始收到确认?

Gar*_*ell 15

如果有两个活跃的消费者,代理将公平地分发新消息(直到每个实例有 200 条未完成的消息)。

如果队列中有150条消息并且没有消费者在运行;第一个启动的消费者(可能)将获得全部 150 个,但是当两者都运行时,分配是公平的。

如果每个消费者有 200 条未完成的消息,代理将在每条消息被确认后按需发送新消息。消费者线程没有被“阻塞”,只是代理将不再发送消息。

默认情况下,spring 会一次确认每条消息。可以通过设置容器的batchSize属性来更改此行为。例如,如果设置为100,则每100条记录发送一个ack;这提高了性能,但增加了失败后重复交付的风险。在这种情况下,代理将在确认后发送最多 100 条新消息。

在旧版本中,batchSize称为txSize.

编辑

请参阅此示例;在最近的版本中默认预取为 250。

@SpringBootApplication
public class So65201334Application {

    public static void main(String[] args) {
        SpringApplication.run(So65201334Application.class, args);
    }

    @RabbitListener(id = "foo", queues = "foo", autoStartup = "false")
    @RabbitListener(id = "bar", queues = "foo", autoStartup = "false")
    void listen(String in, @Header(AmqpHeaders.CONSUMER_TAG) String tag) throws InterruptedException {
        System.out.println(tag);
        Thread.sleep(240_000);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, RabbitListenerEndpointRegistry registry) {
        return args -> {
            for (int i = 0; i < 200; i++) {
                template.convertAndSend("foo", "bar");
            }
            registry.getListenerContainer("foo").start();
            System.out.println("Hit Enter to start the second listener and send more records");
            System.in.read();
            registry.getListenerContainer("bar").start();
            Thread.sleep(2000);
            for (int i = 0; i < 200; i++) {
                template.convertAndSend("foo", "bar");
            }
        };
    }

}
Run Code Online (Sandbox Code Playgroud)

正如预期的那样,所有 200 个都发送给了第一个消费者:

在此输入图像描述

当第二个消费者启动时,记录将发送给两个消费者,而不是没有积压的消费者。现在的分布如下所示:

在此输入图像描述

当我将预取增加到 400 时,您可以看到新消息发送到每个消费者的 50%。

在此输入图像描述