我的芹菜任务有一个基类,其中on_failure实现了一个方法。
在我的测试中,我修补了任务调用的方法之一,以引发异常但从on_faliure未被调用。
基类
class BaseTask(celery.Task):
abstract = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("error")
Run Code Online (Sandbox Code Playgroud)
任务
@celery.task(bind=True, base=BaseTask)
def multiple(self, a, b):
logic.method(a, b)
Run Code Online (Sandbox Code Playgroud)
测试
@patch('tasks.logic.method')
def test_something(self, mock):
# arrange
mock.side_effect = NotImplementedError
# act
with self.assertRaises(NotImplementedError):
multiple(1, 2)
Run Code Online (Sandbox Code Playgroud)
当运行 celery 并引发异常时,一切正常。
CELERY_ALWAYS_EAGER被激活。
我怎样才能on_faliure跑?