我想从Celery任务返回的列表中创建一个组,以便对于任务结果集中的每个项目,将一个任务添加到该组中.
这是一个简单的代码示例来解释用例.本???应该是从以前的任务的结果.
@celery.task
def get_list(amount):
# In reality, fetch a list of items from a db
return [i for i in range(amount)]
@celery.task
def process_item(item):
#do stuff
pass
process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))
Run Code Online (Sandbox Code Playgroud)
我可能没有正确地接近这个,但我很确定从任务中调用任务是不安全的:
@celery.task
def process_list():
for i in get_list.delay().get():
process_item.delay(i)
Run Code Online (Sandbox Code Playgroud)
我不需要秒任务的结果.
所以基本上我有一个非常复杂的工作流程,看起来类似于:
>>> res = (add.si(2, 2) | add.s(4) | add.s(8))()
>>> res.get()
16
Run Code Online (Sandbox Code Playgroud)
之后,我走向结果链并收集所有个别结果对我来说相当微不足道:
>>> res.parent.get()
8
>>> res.parent.parent.get()
4
Run Code Online (Sandbox Code Playgroud)
我的问题是,如果我的第三个任务取决于知道第一个任务的结果怎么办,但在这个例子中只收到第二个任务的结果?
链也很长,结果也不小,所以只是通过输入就会不必要地污染结果存储.哪个是Redis,所以使用RabbitMQ,ZeroMQ时的限制......不适用.