在不丢失保留任务的情况下重新加载 celery 代码

boh*_*boh 5 python rabbitmq celery django-celery

我在 Celery 中保留了数百万个任务(ETA 尚未到期),每次我想更新我的 Celery 代码库时,我都必须重新启动它,这会切断与 RabbitMQ 的连接并导致 RabbitMQ 再次重新分配任务(我使用的是 late确认)。

是否可以重新加载新的代码库但仍保留我的保留任务?我正在将芹菜与 Django 一起使用。

2ps*_*2ps 4

简短的回答:是的,可以,但是您必须编写自己的队列排出逻辑。

更长的答案:当你想要进行代码更新时(并且取决于你如何处理它),你必须使用 celery远程控制 api来告诉所有工作人员停止消耗任务。RabbitMQ 代理支持远程控制 API,所以你很幸运

from my_app.celery import app
inspector = app.control.inspect()
controller = app.control

# get a list of current workers
workers = inspector.ping()
active_queues = inspector.active_queues()
all_queues = set()
for worker, queues in active_queues.items():
    for queue in queues:
        all_queues.add(queue['name'])
for queue in all_queues:
    controller.cancel_consumer(queue)
Run Code Online (Sandbox Code Playgroud)

这将阻止您的工作人员消耗任务。现在,您必须监视您的工作人员,直到他们完成处理所有活动任务。

import time
done = False
while not done:
    active_count = 0
    active = inspector.active()
    active_count = sum(map(lambda l: len(l), active.values()))
    done = active_count > 0
    if not done:  
        time.sleep(60)  # wait a minute between checks
Run Code Online (Sandbox Code Playgroud)

一旦您的工作人员完成工作,您就可以清楚地部署代码,而不必担心丢失任务。