用于芹菜单元测试的内存代理

Fac*_*ano 28 django celery kombu

我有一个用Django编写的REST API,其中和端点在发布芹菜任务时对其进行排队.响应包含任务ID,我想用它来测试任务是否已创建并获得结果.所以,我想做的事情如下:

def test_async_job():
    response = self.client.post("/api/jobs/", some_test_data, format="json")
    task_id = response.data['task_id']
    result = my_task.AsyncResult(task_id).get()
    self.assertEquals(result, ...)
Run Code Online (Sandbox Code Playgroud)

我显然不想经营一个芹菜工人来进行单元测试,我希望以某种方式嘲笑它.我不能使用CELERY_ALWAYS_EAGER因为这似乎完全绕过经纪人,防止我使用AsyncResult通过其ID来获得任务(如说在这里).

通过芹菜和kombu文档,我发现有一个内存传输单元测试,这将做我正在寻找的.我尝试覆盖BROKER_URL设置以在测试中使用它:

@override_settings(BROKER_URL='memory://')
def test_async_job():
Run Code Online (Sandbox Code Playgroud)

但行为与ampq代理相同:它会阻止测试等待结果.任何想法我怎么配置这个经纪人让它在测试中工作?

Gui*_*ent 14

您可以在设置中指定broker_backend:

if 'test' in sys.argv[1:]:
    BROKER_BACKEND = 'memory'
    CELERY_TASK_ALWAYS_EAGER = True
    CELERY_TASK_EAGER_PROPAGATES = True
Run Code Online (Sandbox Code Playgroud)

或者您可以直接在测试中使用装饰器覆盖设置

import unittest
from django.test.utils import override_settings


class MyTestCase(unittest.TestCase):

    @override_settings(CELERY_TASK_EAGER_PROPAGATES=True,
                       CELERY_TASK_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        ...
Run Code Online (Sandbox Code Playgroud)

  • `BROKER_BACKEND` 显然仅适用于 Celery 3.x (https://github.com/celery/celery/blob/v3.1.26/celery/app/utils.py#L72)。Celery 4.x/5.x 使用 `CELERY_BROKER_URL = "memory://"` (https://github.com/celery/celery/blob/5.0/celery/contrib/testing/app.py#L17) (3认同)
  • Celery 是否会选择 _namespace_ 之外的选项来选择“BROKER_BACKEND”?那么 `CELERY_BROKER_URL` 应该是什么? (2认同)

小智 10

您可以使用Kombu内存代理来运行单元测试,但是为此,您需要使用与Django服务器相同的Celery应用程序对象来启动Celery工作程序.

要使用内存代理,请将BROKER_URL设置为 memory://localhost/

然后,为了培养小芹菜工人,您可以执行以下操作:

app = <Django Celery App>

# Set the worker up to run in-place instead of using a pool
app.conf.CELERYD_CONCURRENCY = 1
app.conf.CELERYD_POOL = 'solo'

# Code to start the worker
def run_worker():
    app.worker_main()

# Create a thread and run the worker in it
import threading
t = threading.Thread(target=run_worker)
t.setDaemon(True)
t.start()
Run Code Online (Sandbox Code Playgroud)

您需要确保使用与Django芹菜应用程序实例相同的应用程序.

请注意,启动worker将打印许多内容并修改日志记录设置.


Dan*_*ley 7

这是一个TransactionTestCase与Celery 4.x一起使用的Django功能更全面的例子.

import threading

from django.test import TransactionTestCase
from django.db import connections

from myproj.celery import app  # your Celery app


class CeleryTestCase(TransactionTestCase):
    """Test case with Celery support."""

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        app.control.purge()
        cls._worker = app.Worker(app=app, pool='solo', concurrency=1)
        connections.close_all()
        cls._thread = threading.Thread(target=cls._worker.start)
        cls._thread.daemon = True
        cls._thread.start()

    @classmethod
    def tearDownClass(cls):
        cls._worker.stop()
        super().tearDownClass()
Run Code Online (Sandbox Code Playgroud)

请注意,这不会将您的队列名称更改为测试队列,因此如果您还在运行该应用程序,那么您也希望这样做.