eri*_*man 5 python unit-testing celery celery-task
我的芹菜任务有一个基类,其中on_failure实现了一个方法。
在我的测试中,我修补了任务调用的方法之一,以引发异常但从on_faliure未被调用。
基类
class BaseTask(celery.Task):
abstract = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("error")
Run Code Online (Sandbox Code Playgroud)
任务
@celery.task(bind=True, base=BaseTask)
def multiple(self, a, b):
logic.method(a, b)
Run Code Online (Sandbox Code Playgroud)
测试
@patch('tasks.logic.method')
def test_something(self, mock):
# arrange
mock.side_effect = NotImplementedError
# act
with self.assertRaises(NotImplementedError):
multiple(1, 2)
Run Code Online (Sandbox Code Playgroud)
当运行 celery 并引发异常时,一切正常。
CELERY_ALWAYS_EAGER被激活。
我怎样才能on_faliure跑?
小智 3
来自celery GitHub 中一个问题的讨论:on_failure测试是“已经在 Celery 级别完成(验证是否调用了 on_failure)”和“编写一个测试来测试 on_failure 所做的任何事情”。您可以在方法内定义一个函数on_failure并测试它,或者on_failure像类方法一样调用:
import TestCase
from billiard.einfo import ExceptionInfo
class TestTask(TestCase):
def test_on_failure(self):
"Testing on failure method"
exc = Exception("Test")
task_id = "test_task_id"
args = ["argument 1"]
kwargs = {"key": "value"}
einfo = ExceptionInfo
# call on_failure method
multiple.on_failure(exc, task_id, args, kwargs, einfo)
# assert something appened
Run Code Online (Sandbox Code Playgroud)
ExceptionInfo是同一类型的对象celery使用;multiple是您在问题中定义的任务。
希望这可以帮助
| 归档时间: |
|
| 查看次数: |
2667 次 |
| 最近记录: |