我想从Celery任务返回的列表中创建一个组,以便对于任务结果集中的每个项目,将一个任务添加到该组中.
这是一个简单的代码示例来解释用例.本???应该是从以前的任务的结果.
@celery.task
def get_list(amount):
# In reality, fetch a list of items from a db
return [i for i in range(amount)]
@celery.task
def process_item(item):
#do stuff
pass
process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))
Run Code Online (Sandbox Code Playgroud)
我可能没有正确地接近这个,但我很确定从任务中调用任务是不安全的:
@celery.task
def process_list():
for i in get_list.delay().get():
process_item.delay(i)
Run Code Online (Sandbox Code Playgroud)
我不需要秒任务的结果.
像在另一个问题中一样,我想从celery任务返回的列表中创建一个celery组。这个想法是,第一个任务将返回一个列表,第二个任务将将该列表分解为列表中每个项目的并发任务。
计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是一个链,该链下载页面,对其进行处理,然后将其上传到s3。最后,完成所有子页面后,该数据库在我们的数据库中被标记为完成。就像是:
chain(
get_links_from_website.si('https://www.google.com'),
dmap.s( # <-- Distributed map
download_sub_page.s() |
process_sub_page.s() |
upload_sub_page_to_s3.s()
),
mark_website_done.s()
)
Run Code Online (Sandbox Code Playgroud)
到目前为止,我看到的解决方案似乎可以很好地解决此问题,但是当第二个任务是链时,由于clone没有进行深度复制的问题,该解决方案失败了(有关详细信息,请参见此答案的评论):
@task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
Run Code Online (Sandbox Code Playgroud)
还有一个问题是,如果可迭代项的长度为10,000个,它将创建一个包含10,000个项目的组。可以想象,这消耗了我们的内存使用量。
因此,我正在寻找一种做到dmap这一点的方法: