小编eri*_*man的帖子

如何在 celery 中测试 on_failure

我的芹菜任务有一个基类,其中on_failure实现了一个方法。

在我的测试中,我修补了任务调用的方法之一,以引发异常但从on_faliure未被调用。

基类

class BaseTask(celery.Task):
    abstract = True 

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("error")
Run Code Online (Sandbox Code Playgroud)

任务

@celery.task(bind=True, base=BaseTask)
def multiple(self, a, b):
    logic.method(a, b)
Run Code Online (Sandbox Code Playgroud)

测试

@patch('tasks.logic.method')
def test_something(self, mock):
    # arrange
    mock.side_effect = NotImplementedError

    # act
    with self.assertRaises(NotImplementedError):
        multiple(1, 2)
Run Code Online (Sandbox Code Playgroud)

当运行 celery 并引发异常时,一切正常。 CELERY_ALWAYS_EAGER被激活。

我怎样才能on_faliure跑?

python unit-testing celery celery-task

5
推荐指数
1
解决办法
2667
查看次数

标签 统计

celery ×1

celery-task ×1

python ×1

unit-testing ×1