如果一个任务失败,如何取消收集中的所有剩余任务?

16 python exception cancellation python-3.x python-asyncio

如果 的一个任务gather引发异常,其他任务仍然可以继续。

嗯,这不完全是我需要的。我想区分致命的错误和需要取消所有剩余任务的错误,以及不是而是应该记录的错误,同时允许其他任务继续。

这是我实现这一点的失败尝试:

from asyncio import gather, get_event_loop, sleep

class ErrorThatShouldCancelOtherTasks(Exception):
    pass

async def my_sleep(secs):
    await sleep(secs)
    if secs == 5:
        raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
    print(f'Slept for {secs}secs.')

async def main():
    try:
        sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
        await sleepers
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        sleepers.cancel()
    finally:
        await sleep(5)

get_event_loop().run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

finally await sleep这里是为了防止解释器立即关闭,这会自行取消所有任务)

奇怪的是,呼吁cancelgather实际上并没有取消它!

PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.
Run Code Online (Sandbox Code Playgroud)

我对这种行为感到非常惊讶,因为它似乎与文档相矛盾,其中指出:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

从给定的协程对象或期货返回未来聚合结果。

(……)

取消:如果外部 Future 被取消,则所有子级(尚未完成)也将被取消。(……)

我在这里缺少什么?如何取消剩余的任务?

use*_*342 16

您的实现的问题在于它sleepers.cancel()sleepers已经提出之后调用。从技术上讲,返回的未来gather()处于完成状态,因此其取消必须是空操作。

要更正代码,您只需要自己取消孩子,而不是相信gather未来会这样做。当然,协程本身不可取消,因此您需要先将它们转换为任务(gather无论如何都可以,因此您无需做额外的工作)。例如:

async def main():
    tasks = [asyncio.ensure_future(my_sleep(secs))
             for secs in [2, 5, 7]]
    try:
        await asyncio.gather(*tasks)
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        for t in tasks:
            t.cancel()
    finally:
        await sleep(5)
Run Code Online (Sandbox Code Playgroud)

我对这种行为感到非常惊讶,因为它似乎与文档相矛盾 [...]

最初的绊脚石gather是它并没有真正运行任务,它只是一个等待它们完成的助手。出于这个原因gather,如果其中一些任务因异常而失败,则不会费心取消剩余的任务——它只是放弃等待并传播异常,让剩余的任务在后台继续进行。这被报告为一个错误,但没有修复向后兼容性,因为行为被记录下来并且从一开始就没有改变。但这里我们还有另一个问题:文档明确承诺能够取消返回的未来。您的代码正是这样做的,但它不起作用,而且原因很明显(至少我花了一段时间才弄明白,并且需要阅读源代码)。事实证明,合同Future实际上阻​​止了这种工作。当你调用时cancel(),返回的未来gather已经完成,取消一个完成的未来是没有意义的,它只是空操作。(原因是一个完整的未来有一个明确定义的结果,可以被外部代码观察到。取消它会改变它的结果,这是不允许的。)

换句话说,文档并没有,因为如果您在完成之前执行了取消操作,它就会起作用await sleepers。但是,它具有误导性,因为它似乎允许gather()在这个重要用例中取消其可等待的提升之一,但实际上不允许。

像这样在使用时出现的问题gather是许多人热切期待(没有双关语意)在 asyncio 中的三重奏风格托儿所的原因


Tob*_*ist 10

您可以创建自己的自定义收集功能

当发生任何异常时,这将取消其所有子项:

import asyncio

async def gather(*tasks, **kwargs):
    tasks = [ task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
              for task in tasks ]
    try:
        return await asyncio.gather(*tasks, **kwargs)
    except BaseException as e:
        for task in tasks:
            task.cancel()
        raise e


# If a() or b() raises an exception, both are immediately cancelled
a_result, b_result = await gather(a(), b())
Run Code Online (Sandbox Code Playgroud)


Ibo*_*lit 6

您可以使用 Python 3.10(可能还有早期版本)执行以下操作:使用asyncio.wait. 它需要一个可迭代的可等待对象和一个关于何时返回的条件,当满足条件时,它返回两组任务:已完成的任务和待处理的任务。您可以让它在第一个异常时返回,然后逐个取消待处理的任务:


async def my_task(x):
    try: 
        ...
    except RecoverableError as e:
        ...


tasks = [asyncio.crate_task(my_task(x)) for x in xs]
done, pending = await asyncio.wait(taksk, return_when=asyncio.FIRST_EXCEPTION)
for p in pending:
    p.cancel()
Run Code Online (Sandbox Code Playgroud)

您可以将任务包装在 try- except 中,重新引发致命异常并处理非致命异常。事实并非如此gather,但看起来它可以满足您的要求。

https://docs.python.org/3/library/asyncio-task.html#id9