你如何对Celery任务进行单元测试?

Dav*_*idM 96 python unit-testing celery

Celery文档提到在Django中测试Celery,但如果你没有使用Django,则没有解释如何测试Celery任务.你怎么做到这一点?

Fla*_*r87 53

可以使用任何unittest lib同步测试任务.在使用芹菜任务时,我正常做两个不同的测试会议.第一个(正如我建议的那样)是完全同步的,应该是确保算法做它应该做的事情.第二个会话使用整个系统(包括代理)并确保我没有序列化问题或任何其他分发,通信问题.

所以:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y
Run Code Online (Sandbox Code Playgroud)

而你的测试:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)
Run Code Online (Sandbox Code Playgroud)

希望有所帮助!

  • 使用“task.appl().get()”和使用 eager 标志有什么区别/优点?谢谢 (3认同)

gue*_*tli 48

我用这个:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...
Run Code Online (Sandbox Code Playgroud)

文档:http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER允许您同步运行任务,而不需要芹菜服务器.

  • 我相信上面假设模块`celeryconfig.py`存在于一个包中.请参阅http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#configuration. (6认同)
  • 我认为这已经过时了 - 我得到“ImportError:没有名为 celeryconfig 的模块”。 (3认同)

sla*_*acy 28

取决于您想要测试的内容.

  • 直接测试任务代码.不要调用"task.delay(...)"从单元测试中调用"task(...)".
  • 使用CELERY_ALWAYS_EAGER.这将导致您在执行"task.delay(...)"时立即调用您的任务,因此您可以测试整个路径(但不是任何异步行为).

  • 你答案中的CELERY_ALWAYS_EAGER链接现在已经死了. (2认同)

ksi*_*ndi 23

单元测试

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
Run Code Online (Sandbox Code Playgroud)

py.test灯具

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...
Run Code Online (Sandbox Code Playgroud)

附录:让send_task尊重急切

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task
Run Code Online (Sandbox Code Playgroud)


okr*_*tny 19

对于芹菜4上的人来说:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
Run Code Online (Sandbox Code Playgroud)

由于设置名称已更改,如果您选择升级需要更新,请参阅

http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names

  • 根据[官方文档](http://docs.celeryproject.org/en/latest/userguide/testing.html),使用"task_always_eager"(之前的"CELERY_ALWAYS_EAGER")不适合单元测试.相反,他们提出了一些其他很好的方法来测试您的Celery应用程序. (8认同)
  • 我只想补充一点,您不希望在单元测试中使用急切任务的原因是因为您没有测试,例如在生产中使用代码后将发生的参数序列化。 (5认同)

Aar*_*ier 13

Celery 3.0开始,CELERY_ALWAYS_EAGERDjango中设置的一种方法是:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())
Run Code Online (Sandbox Code Playgroud)


ala*_*jds 10

从 Celery v4.0 开始提供了py.test 设备来启动一个仅用于测试的 celery worker,并在完成后关闭:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@mymachine.local (running)>
    assert myfunc.delay().wait(3)
Run Code Online (Sandbox Code Playgroud)

http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test 中描述的其他装置中,您可以通过以celery_config这种方式重新定义装置来更改 celery 默认选项:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }
Run Code Online (Sandbox Code Playgroud)

默认情况下,测试工作者使用内存代理和结果后端。如果不测试特定功能,则无需使用本地 Redis 或 RabbitMQ。

  • 对我不起作用,测试套件只是挂起。你能提供更多的背景吗?(虽然我还没有投票;))。 (2认同)

Yog*_*oge 10

使用pytest参考

def test_add(celery_worker):
    mytask.delay()
Run Code Online (Sandbox Code Playgroud)

如果您使用烧瓶,请设置应用程序配置

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'
Run Code Online (Sandbox Code Playgroud)

并在 conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application
Run Code Online (Sandbox Code Playgroud)


Dan*_*ski 6

就我而言(我假设还有许多其他人),我想要的只是使用 pytest 测试任务的内部逻辑。

TL;博士; 最终嘲笑一切(选项2


示例用例

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;
Run Code Online (Sandbox Code Playgroud)

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'
Run Code Online (Sandbox Code Playgroud)

但是,由于shared_task装饰器执行了大量 celery 内部逻辑,因此它并不是真正的单元测试。

所以,对我来说,有两个选择:

选项 1:独立的内部逻辑

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;
Run Code Online (Sandbox Code Playgroud)

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);
Run Code Online (Sandbox Code Playgroud)

这看起来很奇怪,除了降低可读性之外,它还需要手动提取和传递作为请求一部分的属性,例如,以防万一task_id您需要它,这使得逻辑不那么纯粹。

选项 2:模拟
模拟 celery 内部结构

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()
Run Code Online (Sandbox Code Playgroud)

然后,我可以模拟请求对象(同样,如果您需要请求中的内容,例如 id 或重试计数器)。

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'
Run Code Online (Sandbox Code Playgroud)

这个解决方案更加手动,但是,它为我提供了实际单元测试所需的控制,而无需重复自己,也不会丢失 celery 范围。