我希望Celery的任务取决于2个或更多其他任务的结果.我研究过Python + Celery:链接工作?和http://pypi.python.org/pypi/celery-tasktree,但只有当任务只有一个依赖任务时,这些才是好的.
我知道TaskSet,但是当TaskSetResult.ready()变为True时,似乎没有办法立即执行回调.我现在想到的是有一个周期性的任务,每隔几[毫秒]左右轮询一次TaskSetResult.ready()并在它返回True时触发回调,但这对我来说听起来相当不优雅.
有什么建议?
vad*_*ipp 11
在Celery(3.0+)的最新版本中,您可以使用所谓的和弦来实现所需的效果:
来自http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives:
简单的和弦
chord原语使我们能够在组中的所有任务完成执行时添加要调用的回调,这对于非并行并行的算法通常是必需的:
>>> from celery import chord
>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
>>> res.get()
90
Run Code Online (Sandbox Code Playgroud)
免责声明:我自己还没试过.
mrbox 是 true,您可以重试,直到结果准备好,但文档中不太清楚,当您重试时,您必须传递 setid 和子任务元素,并且为了恢复它,您必须使用下面的 map 函数是一个示例代码,用于解释我的意思。
\n\ndef run(self, setid=None, subtasks=None, **kwargs):\n\n if not setid or not subtasks:\n #Is the first time that I launch this task, I\'m going to launch the subtasks\n \xe2\x80\xa6\n tasks = []\n for slice in slices:\n tasks.append(uploadTrackSlice.subtask((slice,folder_name)))\n\n job = TaskSet(tasks=tasks)\n task_set_result = job.apply_async()\n setid = task_set_result.taskset_id\n subtasks = [result.task_id for result in task_set_result.subtasks]\n self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])\n\n #Is a retry than we just have to check the results \n tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks))\n if not tasks_result.ready():\n self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])\n else: \n if tasks_result.successful():\n return tasks_result.join()\n else:\n raise Exception("Some of the tasks was failing")\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
7854 次 |
| 最近记录: |