相关疑难解决方法(0)

如何链接一个将列表返回到组的Celery任务?

我想从Celery任务返回的列表中创建一个组,以便对于任务结果集中的每个项目,将一个任务添加到该组中.

这是一个简单的代码示例来解释用例.本???应该是从以前的任务的结果.

@celery.task
def get_list(amount):
    # In reality, fetch a list of items from a db
    return [i for i in range(amount)]

@celery.task
def process_item(item):
    #do stuff
    pass

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))
Run Code Online (Sandbox Code Playgroud)

我可能没有正确地接近这个,但我很确定从任务中调用任务是不安全的:

@celery.task
def process_list():
    for i in get_list.delay().get():
        process_item.delay(i)
Run Code Online (Sandbox Code Playgroud)

我不需要秒任务的结果.

python celery

22
推荐指数
1
解决办法
8604
查看次数

将芹菜任务的结果链接到一个分布式组

像在另一个问题中一样,我想从celery任务返回的列表中创建一个celery组。这个想法是,第一个任务将返回一个列表,第二个任务将将该列表分解为列表中每个项目的并发任务。

计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是一个链,该链下载页面,对其进行处理,然后将其上传到s3。最后,完成所有子页面后,该数据库在我们的数据库中被标记为完成。就像是:

chain(
    get_links_from_website.si('https://www.google.com'),
    dmap.s(  # <-- Distributed map
        download_sub_page.s() | 
        process_sub_page.s() | 
        upload_sub_page_to_s3.s()
    ),
    mark_website_done.s()
)
Run Code Online (Sandbox Code Playgroud)

到目前为止,我看到的解决方案似乎可以很好地解决此问题,但是当第二个任务是链时,由于clone没有进行深度复制的问题,该解决方案失败了(有关详细信息,请参见此答案的评论):

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()
Run Code Online (Sandbox Code Playgroud)

还有一个问题是,如果可迭代项的长度为10,000个,它将创建一个包含10,000个项目的组。可以想象,这消耗了我们的内存使用量。

因此,我正在寻找一种做到dmap这一点的方法:

  • 不会通过创建怪异的组来炸毁RAM(也许可以通过迭代来分块吗?)
  • 适用于芹菜链,而不会产生Deepcopy问题。

python celery chain

6
推荐指数
1
解决办法
1114
查看次数

标签 统计

celery ×2

python ×2

chain ×1