如何链接一个将列表返回到组的Celery任务?

Ome*_*tel 22 python celery

我想从Celery任务返回的列表中创建一个组,以便对于任务结果集中的每个项目,将一个任务添加到该组中.

这是一个简单的代码示例来解释用例.本???应该是从以前的任务的结果.

@celery.task
def get_list(amount):
    # In reality, fetch a list of items from a db
    return [i for i in range(amount)]

@celery.task
def process_item(item):
    #do stuff
    pass

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))
Run Code Online (Sandbox Code Playgroud)

我可能没有正确地接近这个,但我很确定从任务中调用任务是不安全的:

@celery.task
def process_list():
    for i in get_list.delay().get():
        process_item.delay(i)
Run Code Online (Sandbox Code Playgroud)

我不需要秒任务的结果.

小智 36

您可以使用中间任务获得此类行为.这是一个创建像地图一样的方法的演示,就像你建议的那样.

from celery import task, subtask, group

@task
def get_list(amount):
    return [i for i in range(amount)]

@task
def process_item(item):
    # do stuff
    pass

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s()))
Run Code Online (Sandbox Code Playgroud)

感谢Ask Solem在我向他提出类似问题的帮助时给我这个建议.

  • 注意 clone 只做一个浅拷贝。如果要克隆“复杂”签名(如链、组或和弦),则需要 (ab) 使用 python 的 deepcopy,如 [celery issue 2251](https://github.com/celery) 中所述/芹菜/问题/2251)。或者您将 `callback = subtask(callback)` 移动到创建函数的 for 循环中并删除 `clone`。 (4认同)
  • 我尝试过做一个两级版本,但没有成功。我在 /sf/ask/4130910171/ 提出了一个新问题 - 任何见解都值得赞赏。 (2认同)