oie*_*iez 12 python django unit-testing celery
我试图在Django的单元测试框架中测试一些芹菜功能,但每当我尝试检查AsyncResult时,测试就像从未启动过一样.
我知道这个代码在RabbitMQ的真实环境中工作,所以我只是想知道为什么它在使用测试框架时不起作用.
这是一个例子:
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,
CELERY_ALWAYS_EAGER = True,
BROKER_BACKEND = 'memory',)
def test_celery_do_work(self):
result = myapp.tasks.celery_do_work.AsyncResult('blat')
applied_task = myapp.tasks.celery_do_work.apply_async((), task_id='blat')
applied_task.wait()
# THIS SUCCEEDS
self.assertTrue(applied_task.successful())
# THIS FAILS
self.assertTrue(result.successful())
Run Code Online (Sandbox Code Playgroud)
使用ALWAYS_EAGER选项是否会禁用AsyncResult功能,因为它立即执行?如果是这样,有没有办法能够单元测试AsyncResult状态检查?如果我尝试取出ALWAYS_EAGER选项,测试永远不会运行,所以我不知所措.
谢谢!
Nic*_*tot 15
当CELERY_ALWAYS_EAGER是True时,调用apply_async()实际上与更换apply().返回的结果是a EagerResult,它已包含您的任务结果.
所以,是的,设置ALWAYS_EAGER = True会禁用整个AsyncResult功能.绕过整个异步进程,并且实际上没有任务发送到代理,这就是您无法通过查找结果的原因AsyncResult.
CELERY_ALWAYS_EAGER = True在测试只需要Celery结果的代码路径时使用,并以相同的方式使用EagerResult或使用AsyncResult.
如果需要,还有运行与测试的方式AsyncResult也与CELERY_ALWAYS_EAGER = False,但对于这一点,你需要调用任务在你的测试用例之前启动工作.然后,工作人员将能够执行您的任务,并且AsyncResult可以正常工作.你可以看看django-celery-testworker,虽然我没有对它进行过测试.
| 归档时间: |
|
| 查看次数: |
4256 次 |
| 最近记录: |