boh*_*boh 5 python rabbitmq celery django-celery
我在 Celery 中保留了数百万个任务(ETA 尚未到期),每次我想更新我的 Celery 代码库时,我都必须重新启动它,这会切断与 RabbitMQ 的连接并导致 RabbitMQ 再次重新分配任务(我使用的是 late确认)。
是否可以重新加载新的代码库但仍保留我的保留任务?我正在将芹菜与 Django 一起使用。
简短的回答:是的,可以,但是您必须编写自己的队列排出逻辑。
更长的答案:当你想要进行代码更新时(并且取决于你如何处理它),你必须使用 celery远程控制 api来告诉所有工作人员停止消耗任务。RabbitMQ 代理支持远程控制 API,所以你很幸运。
from my_app.celery import app
inspector = app.control.inspect()
controller = app.control
# get a list of current workers
workers = inspector.ping()
active_queues = inspector.active_queues()
all_queues = set()
for worker, queues in active_queues.items():
for queue in queues:
all_queues.add(queue['name'])
for queue in all_queues:
controller.cancel_consumer(queue)
Run Code Online (Sandbox Code Playgroud)
这将阻止您的工作人员消耗任务。现在,您必须监视您的工作人员,直到他们完成处理所有活动任务。
import time
done = False
while not done:
active_count = 0
active = inspector.active()
active_count = sum(map(lambda l: len(l), active.values()))
done = active_count > 0
if not done:
time.sleep(60) # wait a minute between checks
Run Code Online (Sandbox Code Playgroud)
一旦您的工作人员完成工作,您就可以清楚地部署代码,而不必担心丢失任务。