让Celery可以看到Django测试用例数据库

drh*_*gen 6 testing django celery

当Django测试用例运行时,它会创建一个独立的测试数据库,以便在每个测试完成时回滚数据库写入.我正在尝试与Celery创建集成测试,但我无法弄清楚如何将Celery连接到这个短暂的测试数据库.在天真的设置中,保存在Django中的对象对Celery是不可见的,并且保存在Celery中的对象会无限期地保留.

这是一个示例测试用例:

import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response

class MyTestCase(APITestCase):
    @classmethod
    def setUpTestData(cls):
        # This object is not visible to Celery
        MyModel(id='test_object').save()

    def test_celery_integration(self):
        # This view spawns a Celery task
        # Task should see MyModel.objects.get(id='test_object'), but can't
        http_response = self.client.post('/', 'test_data', format='json')

        result = get_result_from_response(http_response)
        result.get()  # Wait for task to finish before ending test case
        # Objects saved by Celery task should be deleted, but persist
Run Code Online (Sandbox Code Playgroud)

我有两个问题:

  1. 如何让Celery可以看到Django测试用例的对象?

  2. 如何确保Celery保存的所有对象在测试完成后自动回滚?

如果无法自动执行此操作,我愿意手动清理对象,但是tearDownAPISimpleTestCase至可以回滚删除对象.

drh*_*gen 13

这可以通过在Django测试用例中启动Celery工作程序来实现.

背景

Django的内存数据库是sqlite3.正如它在Sqlite内存数据库的描述页面上所说,"[A] ll共享内存数据库的数据库连接需要在同一个进程中." 这意味着,只要Django使用内存测试数据库并且Celery在一个单独的进程中启动,从根本上不可能让Celery和Django共享一个测试数据库.

但是,celery.contrib.testing.worker.start_worker可以在同一进程中的单独线程中启动Celery工作程序.该worker可以访问内存数据库.

这假设Celery已经以Django项目的常规方式设置.

因为Django-Celery涉及一些跨线程通信,所以只有不在隔离事务中运行的测试用例才有效.测试用例必须直接SimpleTestCase从其Rest等效继承APISimpleTestCase,并将class属性设置allow_database_queriesTrue.

关键是在setUpClass方法中启动Celery工作者并在方法中TestCase关闭它tearDownClass.关键功能是celery.contrib.testing.worker.start_worker(app),它需要一个当前Celery应用程序的实例,可能是从mysite.celery.appPython 获得并返回一个Python ContextManager,它有__enter____exit__方法,必须分别调用setUpClasstearDownClass.可能有一种方法可以避免手动输入和存在ContextManager装饰器或其他东西,但我无法弄明白.这是一个示例tests.py文件:

from celery.contrib.testing.worker import start_worker
from django.test import SimpleTestCase

from mysite.celery import app

class BatchSimulationTestCase(SimpleTestCase):
    allow_database_queries = True

    @classmethod
    def setUpClass(cls):
        super().setUpClass()

        # Start up celery worker
        cls.celery_worker = start_worker(app)
        cls.celery_worker.__enter__()

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()

        # Close worker
        cls.celery_worker.__exit__(None, None, None)

    def test_my_function(self):
        # my_task.delay() or something
Run Code Online (Sandbox Code Playgroud)

无论出于何种原因,测试工作者都会尝试使用一个被调用的任务'celery.ping',可能会在工作失败的情况下提供更好的错误消息.即使是设置perform_ping_checkFalse作为关键字参数OT start_worker还是测试了它的存在.它正在寻找的任务是celery.contrib.testing.tasks.ping.但是,默认情况下不安装此任务.它应该可以通过增加提供这项任务celery.contrib.testing,以INSTALLED_APPSsettings.py.但是,这只会使工人看到它; 而不是生成工人的代码.生成worker的代码会执行assert 'celery.ping' in app.tasks,但会失败.对此进行评论会使一切正常,但修改已安装的库并不是一个好的解决方案.我可能做错了什么,但我确定的解决方法是将简单函数复制到可以被拾取的地方app.autodiscover_tasks(),例如celery.py:

@app.task(name='celery.ping')
def ping():
    # type: () -> str
    """Simple task that just returns 'pong'."""
    return 'pong'
Run Code Online (Sandbox Code Playgroud)

现在,在运行测试时,无需启动单独的Celery进程.Celery工作程序将作为一个单独的线程在Django测试过程中启动.该worker可以看到任何内存数据库,包括默认的内存中测试数据库.要控制工作人员数量,可以使用选项start_worker,但默认情况下是单个工作人员.

  • 我收到类似 `ERROR/MainProcess] Signal handler <bound method DjangoWorkerFixup.on_worker_process_init of <celery.fixups.django.DjangoWorkerFixup object at 0x7f6b096c1c70>>raised: InterfaceError('connection already close')` 的错误。关于如何解决它有什么想法吗? (4认同)
  • ping任务问题的解决方法是在`cls.celery_worker = start_worker(app)`之前添加`app.loader.import_module('celery.contrib.testing.tasks')`或者添加`'celery.contrib.testing.tasks '`到`INSTALLED_APPS`(注意最后的`.tasks`). (2认同)