以指数退避重试Celery任务

Qui*_*Par 65 python celery django-celery

对于这样的任务:

from celery.decorators import task

@task()
def add(x, y):
    if not x or not y:
        raise Exception("test error")
    return self.wait_until_server_responds(
Run Code Online (Sandbox Code Playgroud)

如果它抛出异常并且我想从守护进程端重试它,那么如何应用指数退避算法,即在等待2^2, 2^3,2^4秒之后?

也是从服务器端维护的重试,这样如果工作人员碰巧被杀死,那么产生的下一个工作人员将进行重试任务?

ask*_*sol 124

task.request.retries属性包含到目前为止的尝试次数,因此您可以使用它来实现指数后退:

from celery.task import task

@task(bind=True, max_retries=3)
def update_status(self, auth, status):
    try:
        Twitter(auth).update_status(status)
    except Twitter.WhaleFail as exc:
        self.retry(exc=exc, countdown=2 ** self.request.retries)
Run Code Online (Sandbox Code Playgroud)

为防止雷鸣群问题,您可以考虑在指数退避时添加随机抖动:

import random
self.retry(exc=exc, countdown=int(random.uniform(2, 4) ** self.request.retries))
Run Code Online (Sandbox Code Playgroud)

  • 对于芹菜3.1,你应该使用`@task(bind = True)`和celery将`self`作为第一个参数传递给函数,所以你要将args改为`def update_status(self,auth,status): `然后,您可以访问`self.retries` (8认同)
  • 据我所知,倒计时属性为MQ后端的任务设置了一个eta(例如RabbitMQ).所以它不是在客户端设置的. (2认同)
  • 谢谢@robbyt!只是一个小的修正 - [`retries`是`request`的一个属性](http://celery.readthedocs.org/en/latest/userguide/tasks.html#context),所以`self.request.retries`是正确的电话. (2认同)

Rup*_*ier 21

从Celery 4.2开始,您可以将任务配置为自动使用指数退避:http://docs.celeryproject.org/en/master/userguide/tasks.html#automatic-retry-for-known-exceptions

@app.task(autoretry_for=(Exception,), retry_backoff=2)
def add(x, y):
    ...
Run Code Online (Sandbox Code Playgroud)

(这已经在Celery 4.1的文档中,但实际上当时没有发布,请参阅合并请求)

  • 好的捕获,在4.1.0中摸不着头脑,为什么我的"retry_backoff"参数不受尊重. (3认同)
  • @kororo它似乎不适用于`self.retry`,仅适用于其他异常类型 (2认同)