我正在运行一个Django网站并且刚刚让Celery运行,但是我遇到了令人困惑的错误.以下是代码的结构.
在tests.py中:
from tasks import *
from celery.result import AsyncResult
project = Project.objects.create()
# initalize various sub-objects of the project
c = function.delay(project.id)
r = AsyncResult(c.id).ready()
f = AsyncResult(c.id).failed()
# wait until the task is done
while not r and not f:
r = AsyncResult(c.id).ready()
f = AsyncResult(c.id).failed()
self.assertEqual() #will fail because task fails
Run Code Online (Sandbox Code Playgroud)
在tasks.py中:
from __future__ import absolute_import
from celery import shared_task
@shared_task
def function(project_id)
#a bunch of calculations followed by a save of the project
project = Project.objects.get(project=project_id) …Run Code Online (Sandbox Code Playgroud)