在中断事件循环后清理的正确方法是什么?

Nic*_*mas 40 python python-3.4 python-asyncio

我有一个事件循环,它运行一些协同例程作为命令行工具的一部分.用户可以使用通常的Ctrl+ 中断工具C,此时我想在中断的事件循环后正确清理.

这是我尝试过的.

import asyncio


@asyncio.coroutine
def shleepy_time(seconds):
    print("Shleeping for {s} seconds...".format(s=seconds))
    yield from asyncio.sleep(seconds)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # Side note: Apparently, async() will be deprecated in 3.4.4.
    # See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
    tasks = [
        asyncio.async(shleepy_time(seconds=5)),
        asyncio.async(shleepy_time(seconds=10))
    ]

    try:
        loop.run_until_complete(asyncio.gather(*tasks))
    except KeyboardInterrupt as e:
        print("Caught keyboard interrupt. Canceling tasks...")

        # This doesn't seem to be the correct solution.
        for t in tasks:
            t.cancel()
    finally:
        loop.close()
Run Code Online (Sandbox Code Playgroud)

运行此命令并达到Ctrl+ C收益率:

$ python3 asyncio-keyboardinterrupt-example.py 
Shleeping for 5 seconds...
Shleeping for 10 seconds...
^CCaught keyboard interrupt. Canceling tasks...
Task was destroyed but it is pending!
task: <Task pending coro=<shleepy_time() running at asyncio-keyboardinterrupt-example.py:7> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(1)() at /usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py:587]>
Task was destroyed but it is pending!
task: <Task pending coro=<shleepy_time() running at asyncio-keyboardinterrupt-example.py:7> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(0)() at /usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py:587]>
Run Code Online (Sandbox Code Playgroud)

显然,我没有正确清理.我想也许cancel()可以通过任务来完成任务.

在中断事件循环后清理的正确方法是什么?

dan*_*ano 40

当您按CTRL + C时,事件循环将停止,因此您的调用t.cancel()实际上不会生效.对于要取消的任务,您需要重新启动循环.

这是你如何处理它:

import asyncio

@asyncio.coroutine
def shleepy_time(seconds):
    print("Shleeping for {s} seconds...".format(s=seconds))
    yield from asyncio.sleep(seconds)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # Side note: Apparently, async() will be deprecated in 3.4.4.
    # See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
    tasks = asyncio.gather(
        asyncio.async(shleepy_time(seconds=5)),
        asyncio.async(shleepy_time(seconds=10))
    )

    try:
        loop.run_until_complete(tasks)
    except KeyboardInterrupt as e:
        print("Caught keyboard interrupt. Canceling tasks...")
        tasks.cancel()
        loop.run_forever()
        tasks.exception()
    finally:
        loop.close()
Run Code Online (Sandbox Code Playgroud)

一旦我们赶上KeyboardInterrupt,我们打电话tasks.cancel()然后loop再次启动.run_forever实际上将尽快退出tasks被取消(请注意,取消了Future由归国asyncio.gather也将取消所有Futures的这里面),因为中断loop.run_until_complete调用增加了一个done_callbacktasks该停止循环.因此,当我们取消时tasks,该回调会触发,并且循环停止.此时我们调用tasks.exception,只是为了避免收到关于不从中获取异常的警告_GatheringFuture.

  • dano - 当协程引发常规异常(与用户引发键盘中断相反)时,您在此处描述的行为是否适用?我发现[对 `loop.run_forever()` 的调用继续进行,并且取消的任务无论如何都会运行](https://gist.github.com/nchammas/c1486678a0b36f38f22e)。这是预期的吗? (2认同)

ntn*_*nja 13

针对Python 3.6+更新:添加调用以loop.shutdown_asyncgens避免未完全使用的异步生成器导致内存泄漏.另外asyncio.new_event_loop现在用于asyncio.get_event_loop确保最终loop.close调用不会干扰循环的其他可能用途.

以下解决方案受到其他一些答案的启发,几乎适用于所有情况,并且不依赖于您手动跟踪需要清理的任务Ctrl+ C:

loop = asyncio.new_event_loop()
try:
    # Here `amain(loop)` is the core coroutine that may spawn any
    # number of tasks
    sys.exit(loop.run_until_complete(amain(loop)))
except KeyboardInterrupt:
    # Optionally show a message if the shutdown may take a while
    print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True)

    # Do not show `asyncio.CancelledError` exceptions during shutdown
    # (a lot of these may be generated, skip this if you prefer to see them)
    def shutdown_exception_handler(loop, context):
        if "exception" not in context \
        or not isinstance(context["exception"], asyncio.CancelledError):
            loop.default_exception_handler(context)
    loop.set_exception_handler(shutdown_exception_handler)

    # Handle shutdown gracefully by waiting for all tasks to be cancelled
    tasks = asyncio.gather(*asyncio.Task.all_tasks(loop=loop), loop=loop, return_exceptions=True)
    tasks.add_done_callback(lambda t: loop.stop())
    tasks.cancel()

    # Keep the event loop running until it is either destroyed or all
    # tasks have really terminated
    while not tasks.done() and not loop.is_closed():
        loop.run_forever()
finally:
    if hasattr(loop, "shutdown_asyncgens"):  # This check is only needed for Python 3.5 and below
        loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()
Run Code Online (Sandbox Code Playgroud)

上面的代码将从事件循环中获取所有当前任务,并使用asyncio.Task.all_tasks它们将它们放在单个组合的未来中asyncio.gather.然后使用future的.cancel()方法取消该将来的所有任务(都是当前正在运行的任务).该return_exceptions=True则确保所有接收到的asyncio.CancelledError异常存储的,而不是造成了日后成为出错.

上面的代码还将覆盖默认的异常处理程序,以防止asyncio.CancelledError记录生成的异常.


dco*_*les 13

Python 3.7+ 中,建议您使用asyncio.run来启动异步主函数。

asyncio.run将负责为您的程序创建事件循环,并确保在主函数退出时(包括由于KeyboardInterrupt异常)关闭事件循环并清除所有任务。

它大致类似于以下内容(请参阅 参考资料asyncio/runners.py):

def run(coro, *, debug=False):
    """`asyncio.run` is new in Python 3.7"""
    loop = asyncio.get_event_loop()
    try:
        loop.set_debug(debug)
        return loop.run_until_complete(coro)
    finally:
        try:
            all_tasks = asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True)
            all_tasks.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                loop.run_until_complete(all_tasks)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            loop.close()
Run Code Online (Sandbox Code Playgroud)


Amb*_*jak 5

除非您使用的是 Windows,否则请为 SIGINT(以及 SIGTERM,以便您可以将其作为服务运行)设置基于事件循环的信号处理程序。在这些处理程序中,您可以立即退出事件循环,也可以启动某种清理序列并稍后退出。

官方 Python 文档中的示例:https : //docs.python.org/3.4/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm