相关疑难解决方法(0)

芹菜 - 在链子里面的小组

我想在链中使用一个组(或块),例如:

chain(getRange.s(3),  GROUP() , xsum.s() )
Run Code Online (Sandbox Code Playgroud)

GROUP()一组double()任务在哪里,即group(double(0),double(1),double(2)).在如何链接一个将列表返回到组中的Celery任务中发布了类似的问题但是没有解释如何将组中的输出传递给链中的下一个任务.

@task
def getRange(x):
    return range(x)

@task
def double(nr):
    return nr*2

@task
def xsum(list):
    return sum(list)
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

7
推荐指数
1
解决办法
3772
查看次数

将芹菜任务的结果链接到一个分布式组

像在另一个问题中一样,我想从celery任务返回的列表中创建一个celery组。这个想法是,第一个任务将返回一个列表,第二个任务将将该列表分解为列表中每个项目的并发任务。

计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是一个链,该链下载页面,对其进行处理,然后将其上传到s3。最后,完成所有子页面后,该数据库在我们的数据库中被标记为完成。就像是:

chain(
    get_links_from_website.si('https://www.google.com'),
    dmap.s(  # <-- Distributed map
        download_sub_page.s() | 
        process_sub_page.s() | 
        upload_sub_page_to_s3.s()
    ),
    mark_website_done.s()
)
Run Code Online (Sandbox Code Playgroud)

到目前为止,我看到的解决方案似乎可以很好地解决此问题,但是当第二个任务是链时,由于clone没有进行深度复制的问题,该解决方案失败了(有关详细信息,请参见此答案的评论):

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()
Run Code Online (Sandbox Code Playgroud)

还有一个问题是,如果可迭代项的长度为10,000个,它将创建一个包含10,000个项目的组。可以想象,这消耗了我们的内存使用量。

因此,我正在寻找一种做到dmap这一点的方法:

  • 不会通过创建怪异的组来炸毁RAM(也许可以通过迭代来分块吗?)
  • 适用于芹菜链,而不会产生Deepcopy问题。

python celery chain

6
推荐指数
1
解决办法
1114
查看次数

如何递归地将一个返回列表的 Celery 任务链接到一个组中?

我从这个问题开始:如何将一个返回列表的 Celery 任务链接到一个组中?

但我想扩展两次。所以在我的用例中,我有:

  • 任务 A:确定给定日期的项目总数
  • 任务 B:下载该日期的 1000 个元数据条目
  • 任务 C:下载一项的内容

所以每一步我都在扩大下一步的项目数量。我可以通过循环遍历任务中的结果并调用.delay()下一个任务函数来实现。但我想我会尽量不让我的主要任务这样做。相反,他们会返回一个元组列表——然后每个元组将被扩展为调用下一个函数的参数。

上述问题的答案似乎满足了我的需要,但我无法找出将其链接到两级扩展的正确方法。

这是我的代码的一个非常精简的示例:

from celery import group
from celery.task import subtask
from celery.utils.log import get_task_logger

from .celery import app

logger = get_task_logger(__name__)

@app.task
def task_range(upper=10):
    # wrap in list to make JSON serializer work
    return list(zip(range(upper), range(upper)))

@app.task
def add(x, y):
    logger.info(f'x is {x} and y is {y}')
    char = chr(ord('a') + x)
    char2 = chr(ord('a') + x*2)
    result = x + y
    logger.info(f'result is …
Run Code Online (Sandbox Code Playgroud)

python celery

5
推荐指数
1
解决办法
1001
查看次数

标签 统计

celery ×3

python ×3

celery-task ×1

chain ×1