Doc*_*ard 3 python django celery celery-task celeryd
我正在运行一个Django网站并且刚刚让Celery运行,但是我遇到了令人困惑的错误.以下是代码的结构.
在tests.py中:
from tasks import *
from celery.result import AsyncResult
project = Project.objects.create()
# initalize various sub-objects of the project
c = function.delay(project.id)
r = AsyncResult(c.id).ready()
f = AsyncResult(c.id).failed()
# wait until the task is done
while not r and not f:
r = AsyncResult(c.id).ready()
f = AsyncResult(c.id).failed()
self.assertEqual() #will fail because task fails
Run Code Online (Sandbox Code Playgroud)
在tasks.py中:
from __future__ import absolute_import
from celery import shared_task
@shared_task
def function(project_id)
#a bunch of calculations followed by a save of the project
project = Project.objects.get(project=project_id)
for part in project.part_set.all():
partFunction(part.id)
p = Part.objects.get(id=part.id)
# add to various variables in project from variables in p
project.save()
Run Code Online (Sandbox Code Playgroud)
在mainapp/settings.py中:
BROKER_URL = "amqp://ipaddress"
CELERY_RESULT_BACKEND='amqp'
CELERY_ACCEPT_CONTENT = ['json','pickle','msgpack','yaml']
CELERY_IGNORE_RESULT = False
Run Code Online (Sandbox Code Playgroud)
celery调试控制台日志必须通过list/tuple:
[INFO/MainProcess] Received task: myapp.tasks.function[id]
[ERROR/MainProcess] Task myapp.tasks.function[id]
raised unexpected: ValueError('task args must be a list or tuple',)
Traceback:
File "/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
return self.run(*args, **kwargs)
File "/myapp/tasks.py", line 28, in function
p = Part.objects.get(id=part.id)
File "/python2.7/site-packages/celery/app/task.py", line 555, in apply_async
**dict(self._get_exec_options(), **options)
File "/python2.7/site-packages/celery/app/base.py", line 351, in send_task
reply_to=reply_to or self.oid, **options
File "celery/app/amqp.py", line 252, in publish_task
raise ValueError('task args must be a list or tuple')
ValueError: task args must be a list or tuple
Run Code Online (Sandbox Code Playgroud)
我得到的错误如上所述AsyncResult(c.id).result: task args must be a list or tuple.这应该是一个简单的解决方案,但事实并非如此.当我把它变成这样的列表时:
inline = [project.id]
c = function.delay(inline)
Run Code Online (Sandbox Code Playgroud)
然后它改变了主意并告诉我 AsyncResult(c.id).result: int() argument must be a string or a number, not 'list'
你可以想象我对可能出现的问题非常困惑.
tasks.py
@shared_task
def function(app):
@app.task(name='myapp.function', bind=True)
def function(project_id):
Run Code Online (Sandbox Code Playgroud)
tests.py
c = function.s(project.id).delay()
Run Code Online (Sandbox Code Playgroud)
function.app打印
您在任务中的代码中收到错误,它在回溯中显示:
File "/myapp/tasks.py", line 28, in function
p = Part.objects.get(id=part.id)
Run Code Online (Sandbox Code Playgroud)
你的代码似乎是正确的,但从追溯看起来,芹菜有一个旧版本的任务腌制.非常重要的是,每当你改变内部的任何东西时你都会重新启动芹菜task.py(即使你改变了其他文件,但我不这么认为).这可能是你问题的原因,它让我陷入困境几次.
此外,没有理由part单独从数据库中提取数据库,p = Part.objects.get(id=part.id)因为在迭代查询集时已经获得了当前的部分实例for part in project.part_set.all():.这只是一个建议,你可能有更多的代码需要这一步.
作为旁注,如果它是您项目中的任务而不是某些可重用应用程序的一部分,只需使用常规@task装饰器,芹菜会找到它,但请确保您Celery正确配置应用程序,这是我的另一个帖子可以指导您完成:Celery/Django Single Tasks多次运行
因此,只要您正确配置了所有内容,就像以前一样使用它:
@task #or @shared_task
def function(project_id)
#a bunch of calculations followed by a save of the project
project = Project.objects.get(project=project_id)
....
Run Code Online (Sandbox Code Playgroud)
然后叫它:
result = function.delay(project.id)
Run Code Online (Sandbox Code Playgroud)
要么:
result = function.apply_async(args=(project.id,))
Run Code Online (Sandbox Code Playgroud)
显然我也建议不用芹菜直接调用它来测试任务function(project.id),但我相信你知道的.