16 python exception cancellation python-3.x python-asyncio
如果 的一个任务gather引发异常,其他任务仍然可以继续。
嗯,这不完全是我需要的。我想区分致命的错误和需要取消所有剩余任务的错误,以及不是而是应该记录的错误,同时允许其他任务继续。
这是我实现这一点的失败尝试:
from asyncio import gather, get_event_loop, sleep
class ErrorThatShouldCancelOtherTasks(Exception):
pass
async def my_sleep(secs):
await sleep(secs)
if secs == 5:
raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
print(f'Slept for {secs}secs.')
async def main():
try:
sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
await sleepers
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
sleepers.cancel()
finally:
await sleep(5)
get_event_loop().run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
(finally await sleep这里是为了防止解释器立即关闭,这会自行取消所有任务)
奇怪的是,呼吁cancel在gather实际上并没有取消它!
PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.
Run Code Online (Sandbox Code Playgroud)
我对这种行为感到非常惊讶,因为它似乎与文档相矛盾,其中指出:
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)从给定的协程对象或期货返回未来聚合结果。
(……)
取消:如果外部 Future 被取消,则所有子级(尚未完成)也将被取消。(……)
我在这里缺少什么?如何取消剩余的任务?
use*_*342 16
您的实现的问题在于它sleepers.cancel()在sleepers已经提出之后调用。从技术上讲,返回的未来gather()处于完成状态,因此其取消必须是空操作。
要更正代码,您只需要自己取消孩子,而不是相信gather未来会这样做。当然,协程本身不可取消,因此您需要先将它们转换为任务(gather无论如何都可以,因此您无需做额外的工作)。例如:
async def main():
tasks = [asyncio.ensure_future(my_sleep(secs))
for secs in [2, 5, 7]]
try:
await asyncio.gather(*tasks)
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
for t in tasks:
t.cancel()
finally:
await sleep(5)
Run Code Online (Sandbox Code Playgroud)
我对这种行为感到非常惊讶,因为它似乎与文档相矛盾 [...]
最初的绊脚石gather是它并没有真正运行任务,它只是一个等待它们完成的助手。出于这个原因gather,如果其中一些任务因异常而失败,则不会费心取消剩余的任务——它只是放弃等待并传播异常,让剩余的任务在后台继续进行。这被报告为一个错误,但没有修复向后兼容性,因为行为被记录下来并且从一开始就没有改变。但这里我们还有另一个问题:文档明确承诺能够取消返回的未来。您的代码正是这样做的,但它不起作用,而且原因很明显(至少我花了一段时间才弄明白,并且需要阅读源代码)。事实证明,合同Future实际上阻止了这种工作。当你调用时cancel(),返回的未来gather已经完成,取消一个完成的未来是没有意义的,它只是空操作。(原因是一个完整的未来有一个明确定义的结果,可以被外部代码观察到。取消它会改变它的结果,这是不允许的。)
换句话说,文档并没有错,因为如果您在完成之前执行了取消操作,它就会起作用await sleepers。但是,它具有误导性,因为它似乎允许gather()在这个重要用例中取消其可等待的提升之一,但实际上不允许。
像这样在使用时出现的问题gather是许多人热切期待(没有双关语意)在 asyncio 中的三重奏风格托儿所的原因。
Tob*_*ist 10
当发生任何异常时,这将取消其所有子项:
import asyncio
async def gather(*tasks, **kwargs):
tasks = [ task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
for task in tasks ]
try:
return await asyncio.gather(*tasks, **kwargs)
except BaseException as e:
for task in tasks:
task.cancel()
raise e
# If a() or b() raises an exception, both are immediately cancelled
a_result, b_result = await gather(a(), b())
Run Code Online (Sandbox Code Playgroud)
您可以使用 Python 3.10(可能还有早期版本)执行以下操作:使用asyncio.wait. 它需要一个可迭代的可等待对象和一个关于何时返回的条件,当满足条件时,它返回两组任务:已完成的任务和待处理的任务。您可以让它在第一个异常时返回,然后逐个取消待处理的任务:
async def my_task(x):
try:
...
except RecoverableError as e:
...
tasks = [asyncio.crate_task(my_task(x)) for x in xs]
done, pending = await asyncio.wait(taksk, return_when=asyncio.FIRST_EXCEPTION)
for p in pending:
p.cancel()
Run Code Online (Sandbox Code Playgroud)
您可以将任务包装在 try- except 中,重新引发致命异常并处理非致命异常。事实并非如此gather,但看起来它可以满足您的要求。
https://docs.python.org/3/library/asyncio-task.html#id9
| 归档时间: |
|
| 查看次数: |
3692 次 |
| 最近记录: |