zxy*_*too 15 python django decorator celery django-celery
我正在使用django和芹菜(django-celery)开展一个项目.我们的团队决定将所有数据访问代码(app-name)/manager.py包装在内(不要像管理员那样包装django),让(app-name)/task.py中的代码只处理汇编和执行芹菜任务(所以我们没有django此层中的ORM依赖项).
在我manager.py,我有这样的事情:
def get_tag(tag_name):
ctype = ContentType.objects.get_for_model(Photo)
try:
tag = Tag.objects.get(name=tag_name)
except ObjectDoesNotExist:
return Tag.objects.none()
return tag
def get_tagged_photos(tag):
ctype = ContentType.objects.get_for_model(Photo)
return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)
def get_tagged_photos_count(tag):
return get_tagged_photos(tag).count()
Run Code Online (Sandbox Code Playgroud)
在我的task.py中,我喜欢将它们包装成任务(然后可能使用这些任务来完成更复杂的任务),所以我写这个装饰器:
import manager #the module within same app containing data access functions
class mfunc_to_task(object):
def __init__(mfunc_type='get'):
self.mfunc_type = mfunc_type
def __call__(self, f):
def wrapper_f(*args, **kwargs):
callback = kwargs.pop('callback', None)
mfunc = getattr(manager, f.__name__)
result = mfunc(*args, **kwargs)
if callback:
if self.mfunc_type == 'get':
subtask(callback).delay(result)
elif self.mfunc_type == 'get_or_create':
subtask(callback).delay(result[0])
else:
subtask(callback).delay()
return result
return wrapper_f
Run Code Online (Sandbox Code Playgroud)
那么(还在task.py):
#@task
@mfunc_to_task()
def get_tag():
pass
#@task
@mfunc_to_task()
def get_tagged_photos():
pass
#@task
@mfunc_to_task()
def get_tagged_photos_count():
pass
Run Code Online (Sandbox Code Playgroud)
事情很好,没有@task.但是,在应用该@task装饰器(在芹菜文件指示的顶部)之后,事情就开始崩溃了.显然,每次mfunc_to_task.__call__调用时,相同的task.get_tag函数都会被传递为f.所以我wrapper_f每次都以同样的方式结束,现在我唯一能做的就是获得一个标签.
我是装饰员的新手.任何人都可以帮助我理解这里出了什么问题,或指出其他方法来完成任务?我真的不想为每个数据访问函数编写相同的任务包装代码.
mic*_*mit 19
不太确定为什么传递参数不起作用?
如果您使用此示例:
@task()
def add(x, y):
return x + y
Run Code Online (Sandbox Code Playgroud)
让我们添加一些日志记录到MyCoolTask:
from celery import task
from celery.registry import tasks
import logging
import celery
logger = logging.getLogger(__name__)
class MyCoolTask(celery.Task):
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
logger.info("Starting to run")
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point of the task whatever is the state
logger.info("Ending run")
pass
Run Code Online (Sandbox Code Playgroud)
并创建一个扩展类(扩展MyCoolTask,但现在使用参数):
class AddTask(MyCoolTask):
def run(self,x,y):
if x and y:
result=add(x,y)
logger.info('result = %d' % result)
return result
else:
logger.error('No x or y in arguments')
tasks.register(AddTask)
Run Code Online (Sandbox Code Playgroud)
并确保您将kwargs作为json数据传递:
{"x":8,"y":9}
Run Code Online (Sandbox Code Playgroud)
我得到了结果:
[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17
Run Code Online (Sandbox Code Playgroud)
而不是使用装饰器为什么你不创建扩展的基类celery.Task?
通过这种方式,您的所有任务都可以扩展您的自定义任务类,您可以使用方法__call__和方法实现您的个人行为after_return
.您还可以为所有任务定义常用方法和对象.
class MyCoolTask(celery.Task):
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point of the task whatever is the state
pass
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
12968 次 |
| 最近记录: |