当Django测试用例运行时,它会创建一个独立的测试数据库,以便在每个测试完成时回滚数据库写入.我正在尝试与Celery创建集成测试,但我无法弄清楚如何将Celery连接到这个短暂的测试数据库.在天真的设置中,保存在Django中的对象对Celery是不可见的,并且保存在Celery中的对象会无限期地保留.
这是一个示例测试用例:
import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response
class MyTestCase(APITestCase):
@classmethod
def setUpTestData(cls):
# This object is not visible to Celery
MyModel(id='test_object').save()
def test_celery_integration(self):
# This view spawns a Celery task
# Task should see MyModel.objects.get(id='test_object'), but can't
http_response = self.client.post('/', 'test_data', format='json')
result = get_result_from_response(http_response)
result.get() # Wait for task to finish before ending test case
# Objects saved by Celery task should be deleted, but persist
Run Code Online (Sandbox Code Playgroud)
我有两个问题:
如何让Celery可以看到Django测试用例的对象?
如何确保Celery保存的所有对象在测试完成后自动回滚?
如果无法自动执行此操作,我愿意手动清理对象,但是 …