在芹菜中,如何确保在工人崩溃时重试任务

aqs*_*aqs 7 scheduled-tasks redis celery celery-task django-celery

首先,请不要考虑这个问题作为一个重复 这个问题

我有一个设置环境,使用celeryredisas brokerresult_backend.我的问题是如何确保当芹菜工人崩溃时,所有计划的任务都会重新尝试,当芹菜工人回来时.

我已经看到了使用的建议CELERY_ACKS_LATE = True,以便代理将重新驱动任务,直到它得到一个ACK,但在我的情况下它不起作用.每当我安排一项任务时,它会立即发送到工作人员,该工作人员将其持续到预定的执行时间.让我举个例子:

我正在安排这样的任务:res=test_task.apply_async(countdown=600),但是在芹菜工人日志中,我可以看到类似的东西:Got task from broker: test_task[a137c44e-b08e-4569-8677-f84070873fc0] eta:[2013-01-...].现在,当我杀死芹菜工人时,这些预定的任务就会丢失.我的设置:

BROKER_URL = "redis://localhost:6379/0"  
CELERY_ALWAYS_EAGER = False  
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"  
CELERY_ACKS_LATE = True
Run Code Online (Sandbox Code Playgroud)

ode*_*fos 5

显然这就是芹菜的行为方式。当工作人员突然被杀死(但调度进程没有被杀死)时,即使您有 acks_late=True,该消息也将被视为“失败”

动机(据我理解)是,如果消费者由于内存不足而被操作系统杀死,则重新交付相同的任务是没有意义的。

您可能会在这里看到确切的问题:https ://github.com/celery/celery/issues/1628

我其实不同意这种行为。在我看来,不承认会更有意义。