dyn*_*set 5 python chord celery celery-task
我正在对 celery 任务进行单元测试。我的连锁任务也有组,所以产生了一个和弦。
测试应该如下所示:
我尝试了以下方法:
def wait_for_result(result):
result.get()
for child in result.children or list():
if isinstance(child, GroupResult):
# tried looping over task result in group
# until tasks are ready, but without success
pass
wait_for_result(child)
Run Code Online (Sandbox Code Playgroud)
这会造成死锁,chord_unlock 会永远重试。我对任务结果不感兴趣。如何等待所有子任务完成?
小智 5
虽然这是一个老问题,但我只是想分享我如何摆脱僵局问题,以防万一它对某人有帮助。
就像芹菜日志所说,永远不要get()在任务中使用。这确实会造成僵局。
我有一组类似的芹菜任务,其中包括一系列小组任务,因此使其成为一个和弦。我使用龙卷风通过发出 HTTP 请求来调用这些任务。所以我所做的是这样的:
@task
def someFunction():
....
@task
def someTask():
....
@task
def celeryTask():
groupTask = group([someFunction.s(i) for i in range(10)])
job = (groupTask| someTask.s())
return job
Run Code Online (Sandbox Code Playgroud)
当celeryTask()被tornado调用时,链将开始执行,并且 的UUIDsomeTask()将被保存在job. 它看起来像
异步结果:765b29a8-7873-4b28-b05c-7e19c33e950c
该 UUID 会celeryTask()在链开始执行(理想情况下)之前返回并退出,从而为另一个进程的运行留下空间。
然后我使用龙卷风层来检查任务的状态。有关龙卷风层的详细信息可以在这个stackoverflow 问题中找到
你尝试过和弦+回调吗?
http://docs.celeryproject.org/en/latest/userguide/canvas.html#chords
>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
14251 次 |
| 最近记录: |