如何从Celery worker中捕获自定义异常,或者阻止它以`celery.backends.base`作为前缀?

Chr*_*ris 7 python exception-handling exception celery

我的Celery任务引发了一个自定义异常NonTransientProcessingError,然后被捕获AsyncResult.get().Tasks.py:

class NonTransientProcessingError(Exception):
    pass

@shared_task()
def throw_exception():
    raise NonTransientProcessingError('Error raised by POC model for test purposes')
Run Code Online (Sandbox Code Playgroud)

在Python控制台中:

from my_app.tasks import *
r = throw_exception.apply_async()
try:
    r.get()
except NonTransientProcessingError as e:
    print('caught NonTrans in type specific except clause')
Run Code Online (Sandbox Code Playgroud)

但我的自定义异常是,虽然引发的异常是,所以我的子句失败了.my_app.tasks.NonTransientProcessingErrorAsyncResult.get()celery.backends.base.NonTransientProcessingErrorexcept

Traceback (most recent call last):
  File "<input>", line 4, in <module>
  File "/...venv/lib/python3.5/site-packages/celery/result.py", line 175, in get
    raise meta['result']
celery.backends.base.NonTransientProcessingError: Error raised by POC model for test purposes
Run Code Online (Sandbox Code Playgroud)

如果我在任务中捕获异常,它可以正常工作.只有当异常被提升到.get()调用时才重命名.

如何引发自定义异常并正确捕获?

我已经确认,当我定义一个Task类并在其on_failure方法中引发自定义异常时也会发生同样的情况.以下工作:

try:
    r.get()
except Exception as e:
    if type(e).__name__ == 'NonTransientProcessingError':
        print('specific exception identified')
    else:
        print('caught generic but not identified')
Run Code Online (Sandbox Code Playgroud)

输出:

specific exception identified
Run Code Online (Sandbox Code Playgroud)

但这不是最好的方法吗?理想情况下,我想捕获行为类别的异常超类.

我正在使用Django 1.8.6,Python 3.5和Celery 3.1.18,以及Redis 3.1.18,Python redis lib 2.10.3后端.

gus*_*ong 5

import celery
from celery import shared_task


class NonTransientProcessingError(Exception):
    pass


class CeleryTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        if isinstance(exc, NonTransientProcessingError):
            """
            deal with NonTransientProcessingError
            """
            pass

    def run(self, *args, **kwargs):
        pass


@shared_task(base=CeleryTask)
def add(x, y):
    raise NonTransientProcessingError
Run Code Online (Sandbox Code Playgroud)

使用带有 on_failure 回调的基本任务来捕获自定义异常。