我在 Celery 中使用 Chord 来实现在一组并行任务完成执行时调用的回调。具体来说,我有一组函数来包装对外部 API 的调用。在处理结果并在 Chord 回调中更新我的数据库之前,我想等待所有这些返回。我希望回调在所有 API 调用完成后执行,而不管它们的状态如何。
我的问题是只有当组的子任务都没有引发异常时才会调用回调函数。但是,如果一个子任务引发异常,则会on_error()调用一个可选的错误处理程序,并使用字符串表示task_id和弦。组中的其余任务会继续执行,但永远不会调用回调。
我将用下面的例子来说明这一点:
@app.task
def maybe_succeed():
divisor = randint(0, 10)
return 1 / divisor
@app.task
def master_task():
g = group([maybe_succeed.s() for i in range(100)])
c = g | chord_callback.s()
return c.delay()
@app.task
def chord_callback(results):
print 'Made it here!'
Run Code Online (Sandbox Code Playgroud)
在上面的例子中,调用master_task()将运行组中的所有任务,但是,回调永远不会被调用,因为其中一个maybe_succeed()会失败(除非你非常幸运!)。
现在,我正在通过在我的等效项中捕获所有异常来处理这个问题,maybe_succeed()以便和弦永远不会失败。我想这是一个很好的解决方案,尽管它感觉不到不对。
所以,我的问题是:有没有办法让 Celery Chord 回调执行而不管其组的子任务的返回状态如何?