如何在RabbitMQ中创建延迟队列?

ean*_*son 44 python queue delay rabbitmq pika

使用Python,Pika和RabbitMQ创建延迟(或停放)队列的最简单方法是什么?我见过类似的问题,但Python没有.

在设计应用程序时,我发现这是一个有用的想法,因为它允许我们限制需要重新排队的消息.

总是有可能你会收到比你能处理的更多的消息,可能是HTTP服务器很慢,或者数据库压力太大.

我还发现,在对丢失消息零容忍的情况下出现问题时非常有用,而重新排队无法处理的消息可能会解决这个问题.它也可能导致消息一次又一次排队的问题.可能导致性能问题,并记录垃圾邮件.

ean*_*son 88

我发现这在开发我的应用程序时非常有用.因为它为您提供了简单地重新排队邮件的替代方案.这可以轻松降低代码的复杂性,是RabbitMQ中众多强大的隐藏功能之一.

脚步

首先,我们需要设置两个基本通道,一个用于主队列,另一个用于延迟队列.在我最后的例子中,我包含了一些不需要的额外标志,但使代码更可靠; 比如confirm delivery,delivery_modedurable.您可以在RabbitMQ 手册中找到有关这些的更多信息.

在我们设置了通道后,我们添加了一个绑定到主通道,我们可以使用它来将消息从延迟通道发送到主队列.

channel.queue_bind(exchange='amq.direct',
                   queue='hello')
Run Code Online (Sandbox Code Playgroud)

接下来,我们需要配置延迟通道,以便在消息到期后将消息转发到主队列.

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000,
  'x-dead-letter-exchange' : 'amq.direct',
  'x-dead-letter-routing-key' : 'hello'
})
Run Code Online (Sandbox Code Playgroud)
  • x-message-ttl (消息 - 生存时间)

    这通常用于在特定持续时间后自动删除队列中的旧消息,但通过添加两个可选参数,我们可以更改此行为,而是使用此参数以毫秒为单位确定消息将在延迟队列中保留多长时间.

  • X-死信路由键

    此变量允许我们在消息过期后将消息传输到不同的队列,而不是完全删除它的默认行为.

  • X-死信交换

    此变量确定用于将消息从hello_delay传输到hello队列的Exchange.

发布到延迟队列

当我们完成设置所有基本Pika参数后,您只需使用基本发布将消息发送到延迟队列.

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))
Run Code Online (Sandbox Code Playgroud)

执行完脚本后,您应该会在RabbitMQ管理模块中看到以下队列. 在此输入图像描述

例.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer 
# messages from our delay queue.
channel.queue_bind(exchange='amq.direct',
                   queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_delivery()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds.
  'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B.
  'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

print " [x] Sent"
Run Code Online (Sandbox Code Playgroud)

  • 当要发布的每条消息都有不同的 ttl 时会发生什么?我怎么做? (2认同)
  • @ManuelZubieta链接到上文(https://www.rabbitmq.com/ttl.html)的RabbitMQ TTL文档中“每条消息TTL”部分的警告子部分说明了如何仅从队列的开头使过期的消息过期。对于每个消息TTL,这似乎杀死了这个答案,是一个可行的解决方案。 (2认同)

fly*_*cee 16

你可以使用RabbitMQ官方插件:x-delayed-message.

首先,将ez文件下载并复制到Your_rabbitmq_root_path/plugins中

其次,启用插件(不需要重启服务器):

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Run Code Online (Sandbox Code Playgroud)

最后,使用"x-delay"标题发布您的邮件,例如:

headers.put("x-delay", 5000);
Run Code Online (Sandbox Code Playgroud)

注意:

它不能确保您的消息安全,因为如果您的消息在您的rabbitmq服务器停机期间到期,遗憾的是消息丢失.所以使用这个方案时要小心.

rabbitmq-delayed-message-exchange中享受它和更多信息


Rya*_*lls 8

仅供参考,如何在Spring 3.2.x中执行此操作.

<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/>

<rabbit:queue-arguments id="delayQueueArguments">
  <entry key="x-message-ttl">
    <value type="java.lang.Long">10000</value>
  </entry>
  <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
  <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
</rabbit:queue-arguments>


<rabbit:fanout-exchange name="finalDestinationTopic">
  <rabbit:bindings>
    <rabbit:binding queue="finalDestinationQueue"/>
  </rabbit:bindings>
</rabbit:fanout-exchange>
Run Code Online (Sandbox Code Playgroud)