使用依赖关系图执行Celery任务

Leo*_*nth 9 python celery

我希望Celery的任务取决于2个或更多其他任务的结果.我研究过Python + Celery:链接工作?http://pypi.python.org/pypi/celery-tasktree,但只有当任务只有一个依赖任务时,这些才是好的.

我知道TaskSet,但是当TaskSetResult.ready()变为True时,似乎没有办法立即执行回调.我现在想到的是有一个周期性的任务,每隔几[毫秒]左右轮询一次TaskSetResult.ready()并在它返回True时触发回调,但这对我来说听起来相当不优雅.

有什么建议?

vad*_*ipp 11

在Celery(3.0+)的最新版本中,您可以使用所谓的和弦来实现所需的效果:

来自http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives:

简单的和弦

chord原语使我们能够在组中的所有任务完成执行时添加要调用的回调,这对于非并行并行的算法通常是必需的:

 >>> from celery import chord
 >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
 >>> res.get()
 90
Run Code Online (Sandbox Code Playgroud)

免责声明:我自己还没试过.


Mau*_*cco 4

mrbox 是 true,您可以重试,直到结果准备好,但文档中不太清楚,当您重试时,您必须传递 setid 和子任务元素,并且为了恢复它,您必须使用下面的 map 函数是一个示例代码,用于解释我的意思。

\n\n
def run(self, setid=None, subtasks=None, **kwargs):\n\n    if not setid or not subtasks:\n        #Is the first time that I launch this task, I\'m going to launch the subtasks\n        \xe2\x80\xa6\n        tasks = []\n        for slice in slices:\n            tasks.append(uploadTrackSlice.subtask((slice,folder_name)))\n\n        job = TaskSet(tasks=tasks)\n        task_set_result = job.apply_async()\n        setid = task_set_result.taskset_id\n        subtasks = [result.task_id for result in task_set_result.subtasks]\n        self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])\n\n    #Is a retry than we just have to check the results        \n    tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks))\n    if not tasks_result.ready():\n        self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])\n    else:    \n        if tasks_result.successful():\n            return tasks_result.join()\n        else:\n            raise Exception("Some of the tasks was failing")\n
Run Code Online (Sandbox Code Playgroud)\n