asyncio.CancelledError 和“_GatheringFuture 异常从未被检索”的奇怪行为

use*_*240 1 python asynchronous exception python-3.x python-asyncio

我在看import asyncio: Learn Python's AsyncIO #3 - Using Coroutines。老师举了以下例子:

import asyncio
import datetime

async def keep_printing(name):
    while True:
        print(name, end=" ")
        print(datetime.datetime.now())
        await asyncio.sleep(0.5)

async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )
    try:
        await asyncio.wait_for(group_task, 3)
    except asyncio.TimeoutError:
        print("Time's up!")


if __name__ == "__main__":
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

输出有一个异常:

First 2020-08-11 14:53:12.079830
Second 2020-08-11 14:53:12.079830
Third 2020-08-11 14:53:12.080828 
First 2020-08-11 14:53:12.580865
Second 2020-08-11 14:53:12.580865
Third 2020-08-11 14:53:12.581901 
First 2020-08-11 14:53:13.081979
Second 2020-08-11 14:53:13.082408
Third 2020-08-11 14:53:13.082408 
First 2020-08-11 14:53:13.583497
Second 2020-08-11 14:53:13.583935
Third 2020-08-11 14:53:13.584946
First 2020-08-11 14:53:14.079666
Second 2020-08-11 14:53:14.081169
Third 2020-08-11 14:53:14.115689
First 2020-08-11 14:53:14.570694
Second 2020-08-11 14:53:14.571668
Third 2020-08-11 14:53:14.635769
First 2020-08-11 14:53:15.074124
Second 2020-08-11 14:53:15.074900
Time's up!
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
Run Code Online (Sandbox Code Playgroud)

讲师试图CancelledError通过添加try/exceptin来处理keep_printing

async def keep_printing(name):
    while True:
        print(name, end=" ")
        print(datetime.datetime.now())
        try:
            await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            print(name, "was cancelled!")
            break
Run Code Online (Sandbox Code Playgroud)

但是,同样的异常仍然发生:

# keep printing datetimes
...
First was cancelled!
Second was cancelled!
Third was cancelled!
Time's up!
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
Run Code Online (Sandbox Code Playgroud)

然后讲师只是继续其他主题,再也没有回到这个例子来展示如何解决它。幸运的是,通过实验,我发现我们可以通过在async 函数try/except下添加另一个来修复它:except asyncio.TimeoutError:main

async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )
    try:
        await asyncio.wait_for(group_task, 3)
    except asyncio.TimeoutError:
        print("Time's up!")
        try:
            await group_task
        except asyncio.CancelledError:
            print("Main was cancelled!")
Run Code Online (Sandbox Code Playgroud)

最终输出是:

# keep printing datetimes
...
First was cancelled!
Second was cancelled!
Third was cancelled!
Time's up!
Main was cancelled!
Run Code Online (Sandbox Code Playgroud)

事实上,对于这个版本的main,我们甚至不需要try...except asyncio.CancelledErrorin keep_printing。它仍然可以正常工作。

那是为什么?为什么醒目CancelledErrormain工作,但不是在keep_printing?视频讲师处理这个异常的方式只会让我更加困惑。他一开始就不需要更改任何代码keep_printing

小智 6

我认为你需要放在await前面asyncio.gather。所以这个调用取自您的代码:

    group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )

Run Code Online (Sandbox Code Playgroud)

需要改为:

    group_task = await asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )
Run Code Online (Sandbox Code Playgroud)

不知道为什么,我仍在学习这些东西。


ale*_*ame 5

让我们来看看发生了什么:

  1. 这段代码调度了三个协程被执行并返回Future对象group_task(内部类的实例_GatheringFuture)聚合结果。
group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )
Run Code Online (Sandbox Code Playgroud)
  1. 此代码等待未来完成并超时。如果发生超时,它会取消未来并引发asyncio.TimeoutError.
    try:
        await asyncio.wait_for(group_task, 3)
    except asyncio.TimeoutError:
        print("Time's up!")
Run Code Online (Sandbox Code Playgroud)
  1. 发生超时。让我们看看 asyncio 库的内部task.pywait_for执行以下操作:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
...
await waiter
...
await _cancel_and_wait(fut, loop=loop)  # _GatheringFuture.cancel() inside
raise exceptions.TimeoutError()
Run Code Online (Sandbox Code Playgroud)
  1. 当我们这样做时_GatheringFuture.cancel(),如果任何子任务实际上被取消,CancelledError则传播
class _GatheringFuture(futures.Future):
    ...
    def cancel(self):
        ...
        for child in self._children:
            if child.cancel():
                ret = True
        if ret:
            # If any child tasks were actually cancelled, we should
            # propagate the cancellation request regardless of
            # *return_exceptions* argument.  See issue 32684.
            self._cancel_requested = True
        return ret
Run Code Online (Sandbox Code Playgroud)

然后

...
if outer._cancel_requested:
    # If gather is being cancelled we must propagate the
    # cancellation regardless of *return_exceptions* argument.
    # See issue 32684.
    outer.set_exception(exceptions.CancelledError())
else:
    outer.set_result(results)
Run Code Online (Sandbox Code Playgroud)
  1. 因此,从收集中提取结果或异常更正确 future
async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )
    try:
        await asyncio.wait_for(group_task, 3)
    except asyncio.TimeoutError:
        print("Time's up!")

    try:
        result = await group_task
    except asyncio.CancelledError:
        print("Gather was cancelled")
Run Code Online (Sandbox Code Playgroud)


小智 0

当aw因超时而被取消时,wait_for会等待aw被取消。如果将 CancelledError 处理到协程中,您会收到超时错误。这在 3.7 版本中发生了变化。
\n示例

\n
import asyncio\nimport datetime\n\nasync def keep_printing(name):\n    print(datetime.datetime.now())\n    try:\n        await asyncio.sleep(3600)\n    except asyncio.exceptions.CancelledError:\n        print("done")\n\nasync def main():\n    try:\n        await asyncio.wait_for(keep_printing("First"), timeout=3)\n    except asyncio.exceptions.TimeoutError:\n        print("timeouted")\n\n\nif __name__ == "__main__":\n    asyncio.run(main())\n
Run Code Online (Sandbox Code Playgroud)\n

用于从 Task 或 Future 检索结果的收集方法,您有一个无限循环并且永远不会返回任何结果。如果 aws 序列中的任何 Task 或 Future 被取消( wait_for 发生的情况),则将其视为引发 CancelledError \xe2\x80\x93 ,在这种情况下,gather() 调用不会被取消。这是为了防止取消一个提交的任务/Future 导致其他任务/Future 被取消。
\n对于保护聚集方法,可以将其覆盖到盾牌上。

\n
import asyncio\nimport datetime\n\n\nasync def keep_printing(name):\n    while True:\n        print(name, datetime.datetime.now())\n        try:\n            await asyncio.sleep(0.5)\n        except asyncio.exceptions.CancelledError:\n            print(f"canceled {name}")\n            return None\n\nasync def main():\n    group_task = asyncio.shield(asyncio.gather(\n                     keep_printing("First"),\n                     keep_printing("Second"),\n                     keep_printing("Third"))\n                    )\n    try:\n        await asyncio.wait_for(group_task, 3)\n    except asyncio.exceptions.TimeoutError:\n        print("Done")\n\n\nif __name__ == "__main__":\n    asyncio.run(main())\n
Run Code Online (Sandbox Code Playgroud)\n