重试丢失或失败的任务(Celery,Django和RabbitMQ)

Jul*_*pov 16 rabbitmq celery django-celery

有没有办法确定是否有任何任务丢失并重试?


我认为丢失的原因可能是调度程序错误或工作线程崩溃.

我打算重试它们,但我不确定如何确定哪些任务需要退役?

以及如何自动完成此过程?我可以使用自己的自定义调度程序来创建新任务吗?

编辑:我从文档中发现RabbitMQ从未松散任务,但是当工作线程在任务执行过程中崩溃时会发生什么?

Mau*_*cco 30

你需要的是设置

CELERY_ACKS_LATE = True

迟到的ack意味着任务消息将在执行任务之后被确认,而不仅仅是之前,这是默认行为.通过这种方式,如果工作人员崩溃,兔子MQ仍然会收到消息.

显然,总崩溃(Rabbit + worker)同时无法恢复任务,除非您实现了任务启动和任务结束的日志记录.我个人每次在任务开始时写一行mongodb,在任务完成时写另一行(独立形成结果),这样我就可以通过分析mongo日志知道哪个任务被中断了.

您可以通过覆盖方法__call__after_return芹菜基础任务类来轻松完成.

下面你看到我的一段代码使用一个taskLogger类作为上下文管理器(带有入口和出口点).taskLogger类只是在mongodb实例中写入包含任务信息的行.

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)
Run Code Online (Sandbox Code Playgroud)

我希望这可以有所帮助

  • 关于 `CELERY_ACKS_LATE=True` 的 2 个疑问 [1] `Celery` 如何(如果有的话)确保同一个 `task` 不会被多个 `worker`s 接收?[2] 如果 `Celery` [任务理想情况下应该是 `idempotent`](http://docs.celeryproject.org/en/latest/userguide/tasks.html#tasks),那么有什么 [问题](http://docs.celeryproject.org/en/latest/userguide/tasks.html#tasks) /celery.readthedocs.io/en/v2.2.6/userguide/tasks.html#Task.acks_late) 运行多次?(对于第二个问题,实际上 [here](http://docs.celeryproject.org/en/v2.2.4/faq.html#should-i-use-retry-or-acks-late)他们说没关系,但我正在寻找明确的肯定) (2认同)
  • 即使使用 ack_late,代理也知道消息已被选取,因此永远不会从其他工作人员处选取它。 (2认同)