Celery 将并行任务链接成和弦

Jak*_*sen 4 python redis celery celery-task python-3.x

我正在尝试理解这个异步任务处理设置。到目前为止,我一直在考虑使用 Celery,但还没有锁定任何东西。唯一的要求是我可以使用 Redis 作为代理并将任务分发到多个节点上。

       ->  Task2  ->  Task3
Task1  ->  Task2  ->  Task3    [then]    Task4
       ->  Task2  ->  Task3
Run Code Online (Sandbox Code Playgroud)

解释

  • Task1 生成一个项目列表
  • Task2 从 Task1 接收一项作为参数
  • Task2 和 Task3 是链式的,并且每个链都是并行执行的
  • 当所有 Task2-Task3 链完成时执行 Task4(不需要从 Task3 传递任何数据)

那么问题是,我该如何用芹菜做到这一点?

Ant*_*nko 5

可以使用弦函数和链函数来完成,请看一下示例。它应该适合您的需求。

from celery import Celery, chord, chain

backend = 'redis://redis:6379/'
app = Celery(result_backend=backend, backend=backend)


@app.task
def task1():
    argument = 123
    return chord([
        chain(task2.s(argument), task3.s()),
        chain(task2.s(argument), task3.s()),
        chain(task2.s(argument), task3.s()),
    ])(task4.s())


@app.task
def task2(argument):
    pass


@app.task
def task3(result_task2):
    pass


@app.task
def task4(result):
    pass


task1.apply_async()
Run Code Online (Sandbox Code Playgroud)