为单个单元测试用例更改 celery 设置 task_always_eager

Big*_*Boy 7 python celery pytest flask

我有一个特定的测试用例,其中我使用了 celery 后端,因此我需要将 task_always_eager 设置为 false,否则我将收到 RuntimeError。然而,问题是我需要在设置为 true 的情况下运行所有​​其他测试用例,因此我尝试使用 celery 标记注释将 celery 配置 task_always_eager 设置为 false 仅用于此测试用例。然而,它看起来并没有做任何事情。

这是我的任务的骨架:

@pytest.mark.celery(task_always_eager=False)
def test_task(self):
   # do some stuff to start a task
   do_stuff()
   # do some stuff to get info on a task
   get_info()
   # assertions
Run Code Online (Sandbox Code Playgroud)

错误:

    def _ensure_not_eager(self):
        if self.app.conf.task_always_eager:
            raise RuntimeError(
               "Cannot retrieve result with task_always_eager enabled")
E          RuntimeError: Cannot retrieve result with task_always_eager enabled

/usr/local/lib/python3.6/site-packages/celery/backends/base.py:334: RuntimeError
Run Code Online (Sandbox Code Playgroud)

TLDR:在这个测试用例中我做错了什么将 task_always_eager 设置为 false 因为它在测试运行时没有改变它?

Tyr*_*tyk 7

Celery 的配置对象是动态可更新的,因此您可以使用以下内容更新它:

celery.conf.task_always_eager = False

或者celery.conf.CELERY_ALWAYS_EAGER = False,如果您使用的是4.0芹菜前版本

您可以在运行中、在每次测试的基础上或在“setUp”操作期间执行此操作。