Celery 工作流如何包含动态生成的组?

qua*_*oid 6 python celery

考虑这个 Celery 工作流程:

wf = collect_items.s() | add_details.s() | publish_items.s()
Run Code Online (Sandbox Code Playgroud)

它收集一些项目,并行地为每个项目添加额外的细节,然后在某处发布装饰信息。

我想要的是add_details作为一组任务,每个项目一个,并行获取每个项目的详细信息。显然,该组必须从 输出的数据中生成collect_items

这是我尝试过的,使用默认的 rabbitmq 代理:

app = Celery(backend="rpc://")

@app.task
def collect_items(n):
    return range(n)

@app.task
def add_details(items):
    return group(get_details.s(i) for i in items).delay()

@app.task
def get_details(item):
    return (item, item * item)

@app.task
def publish_items(items):
    print("items = %r" % items)
Run Code Online (Sandbox Code Playgroud)

我希望输出是数字 0-9,用它们的平方装饰,所有这些都是同时计算的:

>>> wf.delay(10).get()
items = [(0, 0), (1, 1), (2, 4), ... (8, 64), (9, 81)]
Run Code Online (Sandbox Code Playgroud)

这确实调用了预期的任务,但不幸的是,即使任务似乎已完成,也将结果publish_items作为一堆GroupResults包含AsyncResultsPENDING 状态传递给。

我等不及这些结果了,publish_items因为你不能get()在任务中使用(死锁风险等)。我认为 Celery 会识别像这样的任务何时add_details返回 GroupResult 并get在返回该值以传递给链中的下一个任务之前对其进行处理。

这似乎是一个常见的模式,无论如何在 Celery 中可以做到这一点?

我在这里看到过类似的问题,但答案似乎假设对 Celery 如何在幕后工作有很多深入了解,无论如何它们对我不起作用。

tut*_*uju 3

这是你的例子,工作方式略有不同,但在我看来,达到了预期的结果。

@app.task
def collect_items(n):
    logger.info("collect %r items", n)
    items = list(range(n))
    return items


@app.task
def schedule_task_group(items):
    logger.info("group get_details tasks & pass results to publish_items")
    return (
        group(get_details.s(i) for i in items) | publish_items.s()
    ).delay()


@app.task
def get_details(item):
    logger.info("get item detail for = %r", item)
    return (item, item * item)


@app.task
def publish_items(items):
    logger.info("publish items = %r", items)
    return items


print('schedule collect_items & pass result to schedule_task_group with n = 5')
(collect_items.s(5) | schedule_task_group.s()).delay()
Run Code Online (Sandbox Code Playgroud)

与您的代码的主要区别在于,我将get_details小组与publish_items任务链接起来,有效地使其成为一个和弦。文档中提到了这一点,并且这是必需的,因为您希望在将整个任务组传递给publish_items.

请查看@quantoid,并告诉我您的想法。请注意,使用该-l INFO标志运行 celery 可以更轻松地可视化工作人员中实际发生的情况。

参考: - http://docs.celeryproject.org/en/latest/userguide/canvas.html - /sf/answers/1060302001/