芹菜链从任务映射到另一个

sha*_*ane 5 python celery

我有芹菜的申请

@task
def step_one(items):
    items.sort()
    for i in range(len(items), 100):
        yield items[i:i + 100]

@task
def step_two(batch):
    for item in batch:
        print(item)

if __name__ == '__main__':
    celery.chain(step_one.s([i for i in range 100000]), step_two.map().s()).delay()
Run Code Online (Sandbox Code Playgroud)

这不起作用,因为step_two期望使用迭代器,但是我希望它获取上一步的结果,正确的方法是什么?如果任务可以从而yield不是在对整个列表进行排序和分块之后开始,那也很好。

我已经看到了这个问题,这是将生成的列表映射到celery中的任务的最佳方法,但是我觉得没有必要创建一个将结果映射到另一个任务的中间步骤。另外,打电话step_two.map.s()给我一个错误function object has no attribute s