Celery Group任务用于map/reduce工作流程

Ric*_*kyA 13 celery

我可以使用Celery Group原语作为map/reduce工作流程中的伞形任务吗?

或者更具体:组中的子任务是否可以在多个服务器上的多个工作程序上运行

来自文档:

However, if you call apply_async on the group it will send a special 
grouping task, so that the action of calling the tasks happens in a worker 
instead of the current process
Run Code Online (Sandbox Code Playgroud)

这似乎意味着任务都发送给一个工人......

在3.0(仍然)之前,可以在可以在多个服务器上运行的TaskSet中触发子任务.问题是确定所有任务是否已完成执行.这通常是通过轮询所有不太优雅的子任务来完成的.我想知道是否可以使用Group原语来缓解这个问题.

Ric*_*kyA 26

我发现有可能使用Chords这样的地图减少问题.

@celery.task(name='ic.mapper')
def mapper():
    #split your problem in embarrassingly parallel maps 
    maps = [map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s()]
    #and put them in a chord that executes them in parallel and after they finish calls 'reduce'
    mapreduce = celery.chord(maps)(reduce.s())    
    return "{0} mapper ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname)

@celery.task(name='ic.map')
def map():
    #do something useful here
    import time
    time.sleep(10.0)
    return "{0} map ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname)

@celery.task(name='ic.reduce')
def reduce(results):
    #put the maps together and do something with the results
    return "{0} reduce ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname)
Run Code Online (Sandbox Code Playgroud)

当映射器在三个工作者/服务器的集群上执行时,它首先执行映射器,该映射器分割您的问题并创建再次提交给代理的新子任务.它们并行运行,因为队列被所有代理使用.此外,还会创建一个和弦任务,轮询所有地图以查看它们是否已完成.完成后,执行reduce任务,您可以将结果粘合在一起.

总之:是的,这是可能的.谢谢蔬菜们!