drh*_*gen 6 testing django celery
当Django测试用例运行时,它会创建一个独立的测试数据库,以便在每个测试完成时回滚数据库写入.我正在尝试与Celery创建集成测试,但我无法弄清楚如何将Celery连接到这个短暂的测试数据库.在天真的设置中,保存在Django中的对象对Celery是不可见的,并且保存在Celery中的对象会无限期地保留.
这是一个示例测试用例:
import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response
class MyTestCase(APITestCase):
@classmethod
def setUpTestData(cls):
# This object is not visible to Celery
MyModel(id='test_object').save()
def test_celery_integration(self):
# This view spawns a Celery task
# Task should see MyModel.objects.get(id='test_object'), but can't
http_response = self.client.post('/', 'test_data', format='json')
result = get_result_from_response(http_response)
result.get() # Wait for task to finish before ending test case
# Objects saved by Celery task should be deleted, but persist
Run Code Online (Sandbox Code Playgroud)
我有两个问题:
如何让Celery可以看到Django测试用例的对象?
如何确保Celery保存的所有对象在测试完成后自动回滚?
如果无法自动执行此操作,我愿意手动清理对象,但是tearDown甚APISimpleTestCase至可以回滚删除对象.
drh*_*gen 13
这可以通过在Django测试用例中启动Celery工作程序来实现.
Django的内存数据库是sqlite3.正如它在Sqlite内存数据库的描述页面上所说,"[A] ll共享内存数据库的数据库连接需要在同一个进程中." 这意味着,只要Django使用内存测试数据库并且Celery在一个单独的进程中启动,从根本上不可能让Celery和Django共享一个测试数据库.
但是,celery.contrib.testing.worker.start_worker可以在同一进程中的单独线程中启动Celery工作程序.该worker可以访问内存数据库.
这假设Celery已经以Django项目的常规方式设置.
因为Django-Celery涉及一些跨线程通信,所以只有不在隔离事务中运行的测试用例才有效.测试用例必须直接SimpleTestCase从其Rest等效继承APISimpleTestCase,并将class属性设置allow_database_queries为True.
关键是在setUpClass方法中启动Celery工作者并在方法中TestCase关闭它tearDownClass.关键功能是celery.contrib.testing.worker.start_worker(app),它需要一个当前Celery应用程序的实例,可能是从mysite.celery.appPython 获得并返回一个Python ContextManager,它有__enter__和__exit__方法,必须分别调用setUpClass和tearDownClass.可能有一种方法可以避免手动输入和存在ContextManager装饰器或其他东西,但我无法弄明白.这是一个示例tests.py文件:
from celery.contrib.testing.worker import start_worker
from django.test import SimpleTestCase
from mysite.celery import app
class BatchSimulationTestCase(SimpleTestCase):
allow_database_queries = True
@classmethod
def setUpClass(cls):
super().setUpClass()
# Start up celery worker
cls.celery_worker = start_worker(app)
cls.celery_worker.__enter__()
@classmethod
def tearDownClass(cls):
super().tearDownClass()
# Close worker
cls.celery_worker.__exit__(None, None, None)
def test_my_function(self):
# my_task.delay() or something
Run Code Online (Sandbox Code Playgroud)
无论出于何种原因,测试工作者都会尝试使用一个被调用的任务'celery.ping',可能会在工作失败的情况下提供更好的错误消息.即使是设置perform_ping_check到False作为关键字参数OT start_worker还是测试了它的存在.它正在寻找的任务是celery.contrib.testing.tasks.ping.但是,默认情况下不安装此任务.它应该可以通过增加提供这项任务celery.contrib.testing,以INSTALLED_APPS在settings.py.但是,这只会使工人看到它; 而不是生成工人的代码.生成worker的代码会执行assert 'celery.ping' in app.tasks,但会失败.对此进行评论会使一切正常,但修改已安装的库并不是一个好的解决方案.我可能做错了什么,但我确定的解决方法是将简单函数复制到可以被拾取的地方app.autodiscover_tasks(),例如celery.py:
@app.task(name='celery.ping')
def ping():
# type: () -> str
"""Simple task that just returns 'pong'."""
return 'pong'
Run Code Online (Sandbox Code Playgroud)
现在,在运行测试时,无需启动单独的Celery进程.Celery工作程序将作为一个单独的线程在Django测试过程中启动.该worker可以看到任何内存数据库,包括默认的内存中测试数据库.要控制工作人员数量,可以使用选项start_worker,但默认情况下是单个工作人员.