什么时候调用loop.close()合适?

Mar*_*rco 6 python python-asyncio

我已经尝试了asyncio一段时间并阅读了PEP;一些教程;甚至奥莱利的书

我想我已经掌握了它的窍门,但我仍然对这种行为感到困惑,loop.close()我无法完全弄清楚何时可以“安全”调用。

简而言之,我的用例是一堆阻塞的“老派”调用,我将它们包装在run_in_executor()和一个外部协程中;如果这些调用中的任何一个出现问题,我想停止进度,取消仍然未完成的调用,打印合理的日志,然后(希望是干净的)离开。

说吧,像这样:

import asyncio
import time


def blocking(num):
    time.sleep(num)
    if num == 2:
        raise ValueError("don't like 2")
    return num


async def my_coro(loop, num):
    try:
        result = await loop.run_in_executor(None, blocking, num)
        print(f"Coro {num} done")
        return result
    except asyncio.CancelledError:
        # Do some cleanup here.
        print(f"man, I was canceled: {num}")


def main():
    loop = asyncio.get_event_loop()
    tasks = []
    for num in range(5):
        tasks.append(loop.create_task(my_coro(loop, num)))

    try:
        # No point in waiting; if any of the tasks go wrong, I
        # just want to abandon everything. The ALL_DONE is not
        # a good solution here.
        future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        done, pending = loop.run_until_complete(future)
        if pending:
            print(f"Still {len(pending)} tasks pending")
            # I tried putting a stop() - with/without a run_forever()
            # after the for - same exception raised.
            #  loop.stop()
            for future in pending:
                future.cancel()

        for task in done:
            res = task.result()
            print("Task returned", res)
    except ValueError as error:
        print("Outer except --", error)
    finally:
        # I also tried placing the run_forever() here,
        # before the stop() - no dice.
        loop.stop()
        if pending:
            print("Waiting for pending futures to finish...")
            loop.run_forever()
        loop.close()
Run Code Online (Sandbox Code Playgroud)

我尝试了stop()andrun_forever()调用的几种变体,“先运行,然后停止”似乎是根据pydoc使用的,并且没有调用 toclose()产生令人满意的结果:

Coro 0 done
Coro 1 done
Still 2 tasks pending
Task returned 1
Task returned 0
Outer except -- don't like 2
Waiting for pending futures to finish...
man, I was canceled: 4
man, I was canceled: 3

Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)

但是,当添加调用时close()(如上所示),我得到两个异常:

exception calling callback for <Future at 0x104f21438 state=finished returned int>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
    callback(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
    self._check_closed()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Run Code Online (Sandbox Code Playgroud)

这充其量是烦人的,但对我来说,完全令人困惑:而且,更糟糕的是,我一直无法弄清楚处理这种情况的正确方法。

因此,有两个问题:

  • 我缺少什么?我应该如何修改上面的代码,使得调用include时close()不会引发?

  • 如果我不打电话,实际会发生什么close()- 在这个微不足道的情况下,我认为这在很大程度上是多余的;但在“真正的”生产代码中可能会产生什么后果呢?

为了我个人的满足,还:

  • 为什么它会上涨?循环还想从 coros/tasks 得到什么:它们要么退出;要么退出。上调; 或被取消:这还不足以让它高兴吗?

非常感谢您提出的任何建议!

use*_*342 5

简而言之,我的用例是一堆阻塞的“老派”调用,我将它们包装在run_in_executor()和一个外部协程中;如果其中任何一个调用出错,我想停止进度,取消仍然未完成的调用

这无法按预期工作,因为run_in_executor将函数提交到线程池,并且操作系统线程无法在 Python(或公开它们的其他语言)中取消。取消由 返回的 futurerun_in_executor将尝试取消底层concurrent.futures.Future,但这只有在阻塞函数尚未运行时才有效,例如因为线程池正忙。一旦开始执行,就无法安全地取消。asyncio与线程相比,支持安全可靠的取消是使用的好处之一。

如果您正在处理同步代码,无论是传统的阻塞调用还是长时间运行的 CPU 密集型代码,您都应该运行它run_in_executor并采用一种方法来中断它。例如,代码偶尔会检查一个stop_requested标志,如果为真则退出,可能会引发异常。然后您可以通过设置适当的标志来“取消”这些任务。

我应该如何修改上面的代码,使得包含的 close() 调用不会引发?

据我所知,目前没有办法在不修改blocking顶级代码的情况下做到这一点。run_in_executor会坚持通知事件循环结果,当事件循环关闭时会失败。取消 asyncio future 并没有什么帮助,因为取消检查是在事件循环线程中执行的,并且错误发生在call_soon_threadsafe工作线程调用之前。(也许可以将检查移至工作线程,但应仔细分析它是否会导致调用cancel()和实际检查之间的竞争条件。)

为什么它会上涨?循环还想从 coros/tasks 得到什么:它们要么退出;要么退出。上调; 或被取消:这还不足以让它高兴吗?

它希望传递给已经开始的阻塞函数run_in_executorblocking在问题中称为字面意思)在事件循环关闭之前完成运行。您取消了 asyncio future,但底层并发 future 仍然想要“打电话回家”,发现循环已关闭。

目前尚不清楚这是否是 asyncio 中的错误,或者您是否不应该关闭事件循环,直到您以某种方式确保提交的所有工作run_in_executor都已完成。这样做需要进行以下更改:

  • 不要尝试取消待处理的期货。取消它们表面上看起来是正确的,但它会阻止您实现wait()这些 future,因为 asyncio 会认为它们是完整的。
  • 相反,向后台任务发送特定于应用程序的事件,通知它们需要中止。
  • loop.run_until_complete(asyncio.wait(pending))之前打电话loop.close()

通过这些修改(除了特定于应用程序的事件 - 我只是让 ssleep()完成其进程),异常没有出现。

如果我不打电话,实际会发生什么close()- 在这个微不足道的情况下,我认为这在很大程度上是多余的;但在“真正的”生产代码中可能会产生什么后果呢?

close()由于典型的事件循环与应用程序的运行时间一样长,因此在程序最后不调用应该没有问题。无论如何,操作系统都会在程序退出时清理资源。

调用loop.close()对于具有明确生命周期的事件循环非常重要。例如,库可能会为特定任务创建一个新的事件循环,在专用线程中运行它,然后处理它。未能关闭此类循环可能会泄漏其内部资源(例如用于线程间唤醒的管道)并导致程序失败。另一个例子是测试套件,它通常为每个单元测试启动一个新的事件循环,以确保测试环境的分离。


编辑:我为此问题提交了一个错误。
编辑2:该错误已由开发人员修复