我想在链中使用一个组(或块),例如:
chain(getRange.s(3), GROUP() , xsum.s() )
Run Code Online (Sandbox Code Playgroud)
GROUP()一组double()任务在哪里,即group(double(0),double(1),double(2)).在如何链接一个将列表返回到组中的Celery任务中发布了类似的问题?但是没有解释如何将组中的输出传递给链中的下一个任务.
@task
def getRange(x):
return range(x)
@task
def double(nr):
return nr*2
@task
def xsum(list):
return sum(list)
Run Code Online (Sandbox Code Playgroud) 像在另一个问题中一样,我想从celery任务返回的列表中创建一个celery组。这个想法是,第一个任务将返回一个列表,第二个任务将将该列表分解为列表中每个项目的并发任务。
计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是一个链,该链下载页面,对其进行处理,然后将其上传到s3。最后,完成所有子页面后,该数据库在我们的数据库中被标记为完成。就像是:
chain(
get_links_from_website.si('https://www.google.com'),
dmap.s( # <-- Distributed map
download_sub_page.s() |
process_sub_page.s() |
upload_sub_page_to_s3.s()
),
mark_website_done.s()
)
Run Code Online (Sandbox Code Playgroud)
到目前为止,我看到的解决方案似乎可以很好地解决此问题,但是当第二个任务是链时,由于clone没有进行深度复制的问题,该解决方案失败了(有关详细信息,请参见此答案的评论):
@task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
Run Code Online (Sandbox Code Playgroud)
还有一个问题是,如果可迭代项的长度为10,000个,它将创建一个包含10,000个项目的组。可以想象,这消耗了我们的内存使用量。
因此,我正在寻找一种做到dmap这一点的方法:
我从这个问题开始:如何将一个返回列表的 Celery 任务链接到一个组中?
但我想扩展两次。所以在我的用例中,我有:
所以每一步我都在扩大下一步的项目数量。我可以通过循环遍历任务中的结果并调用.delay()下一个任务函数来实现。但我想我会尽量不让我的主要任务这样做。相反,他们会返回一个元组列表——然后每个元组将被扩展为调用下一个函数的参数。
上述问题的答案似乎满足了我的需要,但我无法找出将其链接到两级扩展的正确方法。
这是我的代码的一个非常精简的示例:
from celery import group
from celery.task import subtask
from celery.utils.log import get_task_logger
from .celery import app
logger = get_task_logger(__name__)
@app.task
def task_range(upper=10):
# wrap in list to make JSON serializer work
return list(zip(range(upper), range(upper)))
@app.task
def add(x, y):
logger.info(f'x is {x} and y is {y}')
char = chr(ord('a') + x)
char2 = chr(ord('a') + x*2)
result = x + y
logger.info(f'result is …Run Code Online (Sandbox Code Playgroud)