如何在Celery中设置每个消息的到期时间(TTL)?

Fuz*_*Ami 7 rabbitmq celery dead-letter

可以使用到期TTL将消息发布到RabbitMQ队列中:一旦TTL完成,这些消息将过期,并且(如果设置了死信队列)被移除到死信队列.

但是可以使用Celery指定这样的每消息TTL吗?

请注意,我不是在寻找一种指定任务过期的方法,而是消息过期:我希望我的消息在队列中花费(可配置的)一段时间,然后最终获得@死信队列.

TIA.

ill*_*nan 7

简短介绍:过期与过期

RabbitMQ 确实支持每条消息的 TTL(以及队列的 TTL),该行为记录在此处: https: //www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers。技巧是在发布消息时(以毫秒为单位)设置expiration消息属性(https://www.rabbitmq.com/publishers.html#message-properties )。

Celery on the other hand allows you to set the expires parameter (https://docs.celeryproject.org/en/stable/reference/celery.app.task.html) in seconds or as a datetime. The difference from the native RabbitMQ functionality is that the message remains in the queue after expiration. The expired message is delivered to the worker, which then reads the expires header to determine that the message has expired and rejects the message.

tl;dr: expiration != expires

How to pass a message property in Celery

This method is not documented in Celery. I figured it out by trial and error because I wanted a native TTL myself.

The send_task method (celery.app.base.Celery.send_task), which is called for example by apply_async, accepts the **options parameter. All **options unknown to Celery are then passed in the celery.app.amqp.Queues->send_task_message( ... ) method as **kwargs and then as message properties.

So if we can set the message property, there is nothing easier than setting the native expiration:

my_awesome_task.apply_async(args=(11,), expiration=42)
Run Code Online (Sandbox Code Playgroud)

RabbitMQ 管理控制台

  • Note that Celery automatically converts 42 seconds to 42000 milliseconds (which is correct).
  • Expiration (in properties) and Expires (in headers) can be combined, the two functionalities are not affected in any way.