RabbitMQ:如何限制消费率

Ale*_*ack 8 performance message-queue rabbitmq node.js

我需要限制从rabbitmq队列中消费消息的速率.

我发现了很多建议,但大多数都提供了使用预取选项.但是这个选项不能满足我的需要.即使我将预取设置为1,速率也是大约6000条消息/秒.这对消费者来说太多了.

我需要限制每秒大约70到200条消息.这意味着每5-14ms消耗一条消息.没有同步消息.

我正在使用带有amqp.node库的Node.JS.

Ale*_*ack 9

我已经找到了解决方案。

我使用来自 npm 的模块 nanotimer 来计算延迟。

然后我计算延迟 = 1 / [message_per_second] 以纳秒为单位。

然后我使用 prefetch = 1 消费消息

然后我将真正的延迟计算为延迟 - [processing_message_time]

然后我在发送消息确认之前使超时 = 真正延迟

它完美地工作。谢谢大家

  • 只要您只有一个消费者,这就会很好地工作:当然,使用 RabbitMQ 的一大卖点是能够扩展到许多消费者。 (5认同)

Joh*_*ner 6

实现令牌桶可能有所帮助:https: //en.wikipedia.org/wiki/Token_bucket

您可以编写一个生产者,以固定的速率生成"令牌桶队列",并在消息上使用TTL(可能在一秒钟后过期?)或者只设置一个等于每秒速率的最大队列大小.接收"正常队列"消息的消费者还必须接收"令牌桶队列"消息,以便有效地处理消息以限制应用程序.

NodeJS + amqplib示例:

var queueName = 'my_token_bucket';
rabbitChannel.assertQueue(queueName, {durable: true, messageTtl: 1000, maxLength: bucket.ratePerSecond});
writeToken();

function writeToken() {
    rabbitChannel.sendToQueue(queueName, new Buffer(new Date().toISOString()), {persistent: true});
    setTimeout(writeToken, 1000 / bucket.ratePerSecond);
}
Run Code Online (Sandbox Code Playgroud)

  • 对于专用令牌桶队列的好建议,谢谢! (2认同)
  • 这是解决棘手问题的绝妙解决方案。感谢您的建议! (2认同)