我有一个用Django编写的REST API,其中和端点在发布芹菜任务时对其进行排队.响应包含任务ID,我想用它来测试任务是否已创建并获得结果.所以,我想做的事情如下:
def test_async_job():
response = self.client.post("/api/jobs/", some_test_data, format="json")
task_id = response.data['task_id']
result = my_task.AsyncResult(task_id).get()
self.assertEquals(result, ...)
Run Code Online (Sandbox Code Playgroud)
我显然不想经营一个芹菜工人来进行单元测试,我希望以某种方式嘲笑它.我不能使用CELERY_ALWAYS_EAGER因为这似乎完全绕过经纪人,防止我使用AsyncResult通过其ID来获得任务(如说在这里).
通过芹菜和kombu文档,我发现有一个内存传输单元测试,这将做我正在寻找的.我尝试覆盖BROKER_URL设置以在测试中使用它:
@override_settings(BROKER_URL='memory://')
def test_async_job():
Run Code Online (Sandbox Code Playgroud)
但行为与ampq代理相同:它会阻止测试等待结果.任何想法我怎么配置这个经纪人让它在测试中工作?
我需要在倒计时后运行芹菜任务,但能够在某些条件下重置倒计时.例如,我要打电话apply_async用countdown=15,但如果某些事件发生时执行任务之前,我想再次设定倒计时15秒.
从文档和谷歌上搜索我想我可以通过保存的任务ID,然后撤销,如果事件发生重建任务的完成这个任务.我想知道是否有更优雅的方法.