完成所有任务后运行任务

Naf*_*Kay 12 python rabbitmq celery

我正在编写一个需要并行运行一系列任务的应用程序,然后运行所有任务结果的单个任务:

@celery.task
def power(value, expo):
    return value ** expo

@celery.task
def amass(values):
    print str(values)
Run Code Online (Sandbox Code Playgroud)

这是一个非常人为和过于简单的例子,但希望这一点很好.基本上,我有很多需要运行的项目power,但我只想运行amass所有任务的结果.所有这些都应该异步发生,我不需要从该amass方法返回任何东西.

有没有人知道如何在芹菜中设置它,以便一切都是异步执行的,并且在完成所有操作后调用带有结果列表的单个回调?

我已经设置了这个例子,以chord推荐Alexander Afanasiev:

from time import sleep

import random

tasks = []

for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms

callback = amass.s()

r = chord(tasks)(callback)
Run Code Online (Sandbox Code Playgroud)

不幸的是,在上面的例子中,tasks只有在chord调用方法时才会启动所有任务.有没有办法让每个任务可以单独启动,然后我可以添加一个回调到组,以便在一切都完成后运行?

Naf*_*Kay 6

这是一个对我有用的解决方案:

task.py

from time import sleep

import random

@celery.task
def power(value, expo):
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
    return value ** expo

@celery.task
def amass(results, tasks):
    completed_tasks = []
    for task in tasks:
        if task.ready():
            completed_tasks.append(task)
            results.append(task.get())

    # remove completed tasks
    tasks = list(set(tasks) - set(completed_tasks))

    if len(tasks) > 0:
        # resend the task to execute at least 1 second from now
        amass.delay(results, tasks, countdown=1)
    else:
        # we done
        print results
Run Code Online (Sandbox Code Playgroud)

用例:

tasks = []

for i in xrange(10):
    tasks.append(power.delay(i, 2))

amass.delay([], tasks)
Run Code Online (Sandbox Code Playgroud)

应该做的是尽快异步启动所有任务。将它们全部发布到队列后,amass任务也将发布到队列。amass任务将继续重新发布自身,直到所有其他任务都已完成。


ale*_*cxe 5

Celery为您可以想象的大多数工作流程提供了大量工具

看来您需要使用chord。这是文档中的引用:

和弦就像一个组,但带有回调。和弦由标题组和正文组成,其中正文是在标题中的所有任务完成后应执行的任务。