RabbitMQ:针对消费者缓慢的大型队列限制快速生产者

Pet*_*rch 9 throttling rabbitmq

我们目前正在使用RabbitMQ,其中一个持续超快的生产者与受限于有限资源的消费者(例如,慢速MySQL插入)配对.

我们不喜欢声明队列x-max-length,因为一旦达到限制,所有消息都将被删除或死信,我们不想丢失消息.

添加更多的消费者很容易,但他们都将受到一个共享资源的限制,因此无法使用.问题仍然存在:如何减缓生产者的速度?

当然,我们可以在Redis,memcached,MySQL或生产者读取的其他内容中添加一个流控制标志,如同对类似问题的回答指出的那样,或者更好的是,生产者可以定期测试队列长度并限制自己,但是这些看起来像是我的黑客.

我主要质疑我是否有一个根本的误解.我原以为这是一个常见的场景,所以我想知道:

限制生产者的最佳做法是什么?这是如何用RabbitMQ完成的?或者你以完全不同的方式做到这一点?

背景

假设生产者实际上知道如何通过正确的输入减慢自己的速度.例如,硬件传感器或硬件随机数生成器,可以根据需要生成任意数量的事件.

在我们特定的实际案例中,我们有一个用户可以用来添加消息的API.我们不想吞噬和丢弃消息,而是通过让我们的API在队列"满"时返回错误来应用反压,因此调用者/用户知道要退回,或者让API阻塞直到消费者赶上.我们不控制我们的用户,因此无论消费者的速度有多快,我都可以创建一个更快的生产者.

我希望有类似于TCP套接字的API,其中write()可以阻止,并且select()可以使用它来确定句柄是否可写.因此要么拥有RabbitMQ API块,要么在队列已满时返回错误.

小智 2

对于 x-max-length 属性,您说过您不希望消息被丢弃或死信。我看到有一个更新为此添加了更多功能。据我所知,它在文档中指定:

“使用溢出设置来配置队列溢出行为。如果溢出设置为拒绝发布,则最近发布的消息将被丢弃。此外,如果启用了发布者确认,则将通过基本消息通知发布者拒绝发布。确认消息”

据我了解,您可以使用队列限制来拒绝来自发布者的新消息,从而向上游施加一些背压。