如何在上次重试 Celery 任务时执行一些代码

Mar*_*eux 4 django celery background-task

我正在使用 Django 和 Celery 来运行我的后台任务。我有一个任务可能会因某些 IO 原因而失败:

@shared_task(bind=True)
def mytask(self, someargs):
  try:
    do_some_io_operation()
  except SomeException as e:
    self.retry(max_retries=5)

  # do some other stuff

Run Code Online (Sandbox Code Playgroud)

我只想在最后一次重试失败时执行一些代码并退出函数而不引发异常。是否可以?

Mar*_*eux 5

我实际上在另一张票中找到了答案:

@shared_task(bind=True)
def mytask(self, someargs):
  max_retries = 5
  try:
    do_some_io_operation()
  except SomeException as e:

    if (self.request.retries >= max_retries):
      # do some stuff
      return

    self.retry(max_retries=max_retries)

  # do some other stuff
Run Code Online (Sandbox Code Playgroud)