考虑这个 Celery 工作流程:
wf = collect_items.s() | add_details.s() | publish_items.s()
Run Code Online (Sandbox Code Playgroud)
它收集一些项目,并行地为每个项目添加额外的细节,然后在某处发布装饰信息。
我想要的是add_details作为一组任务,每个项目一个,并行获取每个项目的详细信息。显然,该组必须从 输出的数据中生成collect_items。
这是我尝试过的,使用默认的 rabbitmq 代理:
app = Celery(backend="rpc://")
@app.task
def collect_items(n):
return range(n)
@app.task
def add_details(items):
return group(get_details.s(i) for i in items).delay()
@app.task
def get_details(item):
return (item, item * item)
@app.task
def publish_items(items):
print("items = %r" % items)
Run Code Online (Sandbox Code Playgroud)
我希望输出是数字 0-9,用它们的平方装饰,所有这些都是同时计算的:
>>> wf.delay(10).get()
items = [(0, 0), (1, 1), (2, 4), ... (8, 64), (9, 81)]
Run Code Online (Sandbox Code Playgroud)
这确实调用了预期的任务,但不幸的是,即使任务似乎已完成,也将结果publish_items作为一堆GroupResults包含AsyncResultsPENDING 状态传递给。
我等不及这些结果了,publish_items因为你不能get()在任务中使用(死锁风险等)。我认为 Celery 会识别像这样的任务何时add_details返回 GroupResult 并get在返回该值以传递给链中的下一个任务之前对其进行处理。
这似乎是一个常见的模式,无论如何在 Celery 中可以做到这一点?
我在这里看到过类似的问题,但答案似乎假设对 Celery 如何在幕后工作有很多深入了解,无论如何它们对我不起作用。
这是你的例子,工作方式略有不同,但在我看来,达到了预期的结果。
@app.task
def collect_items(n):
logger.info("collect %r items", n)
items = list(range(n))
return items
@app.task
def schedule_task_group(items):
logger.info("group get_details tasks & pass results to publish_items")
return (
group(get_details.s(i) for i in items) | publish_items.s()
).delay()
@app.task
def get_details(item):
logger.info("get item detail for = %r", item)
return (item, item * item)
@app.task
def publish_items(items):
logger.info("publish items = %r", items)
return items
print('schedule collect_items & pass result to schedule_task_group with n = 5')
(collect_items.s(5) | schedule_task_group.s()).delay()
Run Code Online (Sandbox Code Playgroud)
与您的代码的主要区别在于,我将get_details小组与publish_items任务链接起来,有效地使其成为一个和弦。文档中提到了这一点,并且这是必需的,因为您希望在将整个任务组传递给publish_items.
请查看@quantoid,并告诉我您的想法。请注意,使用该-l INFO标志运行 celery 可以更轻松地可视化工作人员中实际发生的情况。
参考: - http://docs.celeryproject.org/en/latest/userguide/canvas.html - /sf/answers/1060302001/
| 归档时间: |
|
| 查看次数: |
733 次 |
| 最近记录: |