use*_*718 9 amqp rabbitmq spring-rabbit spring-amqp
我理解如下
预取只是控制代理一次允许消费者处理多少条消息。当设置为 1 时,这意味着代理将发送 1 条消息,等待 ack,然后发送下一条。
但有关以下场景的问题:
假设预取为 200,我们有 2 个消费者空闲。经纪人收到了 150 条消息,我认为经纪人会随机选择一条消息并发送所有 150 条消息?我认为是的,它不会在消费者之间进行共享。
假设一个消费者有 100 条消息处于 unack 状态,其中一条处于空闲状态,再次预取为 200 条消息。现在我们又收到了 50 条消息,我认为代理会随机将这 50 条消息分配给任一消息?或者它不会向已经有 100 条消息尚未确认的消费者提供
如果预取是200,一个消费者得到200,监听器会阻塞该线程(springrabbitmq listner方法)发送ack直到所有200被处理吗?我认为它不会一一发送ack,而是会等到所有预取的消息都处理完毕。换句话说,如果预取为 200 并且代理发送 200 条消息,那么代理何时开始收到确认?
Gar*_*ell 15
如果有两个活跃的消费者,代理将公平地分发新消息(直到每个实例有 200 条未完成的消息)。
如果队列中有150条消息并且没有消费者在运行;第一个启动的消费者(可能)将获得全部 150 个,但是当两者都运行时,分配是公平的。
如果每个消费者有 200 条未完成的消息,代理将在每条消息被确认后按需发送新消息。消费者线程没有被“阻塞”,只是代理将不再发送消息。
默认情况下,spring 会一次确认每条消息。可以通过设置容器的batchSize
属性来更改此行为。例如,如果设置为100,则每100条记录发送一个ack;这提高了性能,但增加了失败后重复交付的风险。在这种情况下,代理将在确认后发送最多 100 条新消息。
在旧版本中,batchSize
称为txSize
.
编辑
请参阅此示例;在最近的版本中默认预取为 250。
@SpringBootApplication
public class So65201334Application {
public static void main(String[] args) {
SpringApplication.run(So65201334Application.class, args);
}
@RabbitListener(id = "foo", queues = "foo", autoStartup = "false")
@RabbitListener(id = "bar", queues = "foo", autoStartup = "false")
void listen(String in, @Header(AmqpHeaders.CONSUMER_TAG) String tag) throws InterruptedException {
System.out.println(tag);
Thread.sleep(240_000);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template, RabbitListenerEndpointRegistry registry) {
return args -> {
for (int i = 0; i < 200; i++) {
template.convertAndSend("foo", "bar");
}
registry.getListenerContainer("foo").start();
System.out.println("Hit Enter to start the second listener and send more records");
System.in.read();
registry.getListenerContainer("bar").start();
Thread.sleep(2000);
for (int i = 0; i < 200; i++) {
template.convertAndSend("foo", "bar");
}
};
}
}
Run Code Online (Sandbox Code Playgroud)
正如预期的那样,所有 200 个都发送给了第一个消费者:
当第二个消费者启动时,记录将发送给两个消费者,而不是没有积压的消费者。现在的分布如下所示:
当我将预取增加到 400 时,您可以看到新消息发送到每个消费者的 50%。
归档时间: |
|
查看次数: |
16119 次 |
最近记录: |