我正在尝试在 Celery 任务中使用嵌套链。下面是我面临的问题类型的人为示例。
add_tasks.py:
from celery import Celery, chain
app = Celery('add_tasks', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'redis://localhost/0'
@app.task
def add(x, y):
return x + y
@app.task
def inner_chain(x):
# range parameter could change
task_list = [add.s(x,x)] + [add.s(x) for _ in xrange(10)]
return chain(*task_list)() #.get()
@app.task
def outer_chain():
return chain(
add.s(1,1),
inner_chain.s()
)() #.get()
Run Code Online (Sandbox Code Playgroud)
添加_tester.py:
from add_tasks import outer_chain
r = outer_chain.delay()
print r.get()
print r.get().get()
print r.get().get().get()
Run Code Online (Sandbox Code Playgroud)
目前我将获得以下形式的输出:
Result-ID
Result-ID
Result
Run Code Online (Sandbox Code Playgroud)
在我的两个链任务中取消注释 .get() 会导致死锁,并导致关于运行同步子任务的警告,但我不知道如何传递子任务的结果。
做这种事情的正确方法是什么?我需要在我的任务中使用 apply_async 吗?
| 归档时间: |
|
| 查看次数: |
616 次 |
| 最近记录: |