scy*_*gon 8 django celery django-celery
我正在尝试为我的 Django(v. 2.2.3) 应用程序编写 Celery(v. 4.2.1) 集成测试。
周围有一堆过时的文章,但它们似乎都没有使用最新的 celery 测试文档中的内容 - https://docs.celeryproject.org/en/v4.2.1/userguide/testing.html#fixtures
似乎 Celery 带有两个用于测试的装置:celery_app并且celery_worker应该允许在测试的后台线程中实际运行 worker。
正如医生所说,我已经添加了
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'memory://',
'result_backend': 'rpc'
}
Run Code Online (Sandbox Code Playgroud)
进入我的 conftest.py
我已经包装了我的测试功能
@pytest.mark.celery_app
@pytest.mark.celery_worker
Run Code Online (Sandbox Code Playgroud)
通常我把芹菜任务用
from celery import shared_task
@shared_task
def blabla(...):
pass
Run Code Online (Sandbox Code Playgroud)
但我什至试图用
from myapp.celery import celery
@celery.task
def blabla():
pass
Run Code Online (Sandbox Code Playgroud)
还有什么......我通过apply_async提供eta参数来运行我的芹菜任务。
尝试了很多方法,但 celery 固定装置不会影响事情的工作方式,任务调用转到实际的 redis 实例,并由工作人员在单独的进程中选择(如果我运行它),因此我的assert_called失败以及我访问测试数据库中的对象。
这样它就不会使用指定的固定装置,因为它们应该出现在方法参数中并通过超过参数的数量来破坏它。
认为 Celery pytest 插件可能根本不存在,但事实并非如此 - 试图明确注册它:

但是我已经进入了它们的源代码,在prints那里添加了一些疯狂的东西,但我没有看到它们被记录下来。
scy*_*gon 12
在这里,我想通了并写了一篇文章 - https://medium.com/@scythargon/how-to-use-celery-pytest-fixtures-for-celery-intergration-testing-6d61c91775d9
主键:
@pytest.mark.usefixtures('celery_session_app')
@pytest.mark.usefixtures('celery_session_worker')
class MyTest():
def test(self):
assert mul.delay(4, 4).get(timeout=10) == 16
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2358 次 |
| 最近记录: |