使用 asyncio 处理超时

tot*_*ack 6 python python-asyncio

免责声明:这是我第一次尝试该asyncio模块。

我使用asyncio.wait以下方式来尝试支持超时功能,等待一组异步任务的所有结果。这是一个更大的库的一部分,因此我省略了一些不相关的代码。

请注意,该库已经支持提交任务以及使用 ThreadPoolExecutors 和 ProcessPoolExecutors 使用超时,因此我对使用这些选项的建议或关于为什么我使用asyncio. 上代码...

import asyncio
from contextlib import suppress

... 

class AsyncIOSubmit(Node):
    def get_results(self, futures, timeout=None):
        loop = asyncio.get_event_loop()
        finished, unfinished = loop.run_until_complete(
            asyncio.wait(futures, timeout=timeout)
        )
        if timeout and unfinished:
            # Code options in question would go here...see below.
            raise asyncio.TimeoutError
Run Code Online (Sandbox Code Playgroud)

起初我并不担心在超时时取消挂起的任务,但后来我收到了Task was destroyed but it is pending!程序退出或loop.close. 经过一番研究后,我发现了多种取消任务并等待它们实际被取消的方法:

选项1:

[task.cancel() for task in unfinished]
for task in unfinished:
    with suppress(asyncio.CancelledError):
        loop.run_until_complete(task)
Run Code Online (Sandbox Code Playgroud)

选项2:

[task.cancel() for task in unfinished]
loop.run_until_complete(asyncio.wait(unfinished))
Run Code Online (Sandbox Code Playgroud)

选项 3:

# Not really an option for me, since I'm not in an `async` method
# and don't want to make get_results an async method.
[task.cancel() for task in unfinished]
for task in unfinished:
    await task
Run Code Online (Sandbox Code Playgroud)

选项 4:

像这个答案一样的某种 while 循环。似乎我的其他选择更好,但包括完整性。


到目前为止,选项 1 和 2 似乎都运行良好。这两种选择都可能是“正确的”,但随着asyncio多年来的发展,网上的例子和建议要么已经过时,要么变化很大。所以我的问题是...

问题1

选项 1 和选项 2 之间有任何实际差异吗?我知道run_until_complete它将运行到未来完成为止,因此由于选项 1 按特定顺序循环,我想如果较早的任务需要更长的时间才能实际完成,它的行为可能会有所不同。我尝试查看 asyncio 源代码,以了解是否可以asyncio.wait有效地对其底层的任务/未来执行相同的操作,但这并不明显。

问题2

我假设如果其中一个任务正在进行长时间运行的阻塞操作,它实际上可能不会立即取消?也许这仅仅取决于正在使用的底层操作或库是否会立即引发 CancelledError ?也许为 asyncio 设计的库永远不应该发生这种情况?

由于我试图在这里实现超时功能,因此我对此有些敏感。如果可能这些事情可能需要很长时间才能取消,我会考虑致电cancel而不是等待它实际发生,或者设置一个非常短的超时来等待取消完成。

问题3

是否有可能loop.run_until_complete(或者实际上是对 的底层调用)由于超时以外的原因async.wait返回值?unfinished如果是这样,我显然必须稍微调整我的逻辑,但从文档来看,似乎是不可能的。

use*_*342 5

选项 1 和选项 2 之间有任何实际差异吗?

不会。选项 2 看起来更好,而且效率可能稍微更高,但它们的净效果是相同的。

我知道run_until_complete将运行到未来完成为止,因此由于选项 1 按特定顺序循环,我想如果较早的任务需要更长的时间才能实际完成,它的行为可能会有所不同。

乍一看似乎是这样,但实际上并非如此,因为loop.run_until_complete运行提交给循环的所有任务,而不仅仅是作为参数传递的任务。它只会在提供的等待完成后停止- 这就是“运行直到完成”所指的。对已计划任务的循环调用run_until_complete类似于以下异步代码:

ts = [asyncio.create_task(asyncio.sleep(i)) for i in range(1, 11)]
# takes 10s, not 55s
for t in ts:
    await t
Run Code Online (Sandbox Code Playgroud)

这在语义上又等同于以下线程代码:

ts = []
for i in range(1, 11):
    t = threading.Thread(target=time.sleep, args=(i,))
    t.start()
    ts.append(t)
# takes 10s, not 55s
for t in ts:
    t.join()
Run Code Online (Sandbox Code Playgroud)

换句话说,await t阻塞run_until_complete(t)直到t完成,但允许其他一切 - 例如之前安排的任务asyncio.create_task()也在此期间运行。因此,总运行时间将等于最长任务的运行时间,而不是它们的总和。例如,如果第一个任务恰好需要很长时间,那么所有其他任务都会同时完成,并且他们的等待根本不会休眠。

所有这些仅适用于之前已安排的等待任务。如果您尝试将其应用于协程,它将不起作用:

# runs for 55s, as expected
for i in range(1, 11):
    await asyncio.sleep(i)

# also 55s - we didn't call create_task() so it's equivalent to the above
ts = [asyncio.sleep(i) for i in range(1, 11)]
for t in ts:
    await t

# also 55s
for i in range(1, 11):
   t = threading.Thread(target=time.sleep, args=(i,))
   t.start()
   t.join()
Run Code Online (Sandbox Code Playgroud)

对于 asyncio 初学者来说,这通常是一个症结所在,他们编写与最后一个 asyncio 示例等效的代码,并期望它能够并行运行。

我尝试查看 asyncio 源代码,以了解是否可以asyncio.wait有效地对其底层的任务/未来执行相同的操作,但这并不明显。

asyncio.wait只是一个方便的 API,它做两件事:

  • 将输入参数转换为实现Future. 对于协程来说,这意味着它将它们提交到事件循环,就像 with 一样create_task,这允许它们独立运行。如果您像您一样一开始就给它任务,则会跳过此步骤。
  • 用于add_done_callback在 future 完成时收到通知,此时它恢复其调用者。

所以是的,它做同样的事情,但采用不同的实现,因为它支持更多的功能。

我假设如果其中一个任务正在进行长时间运行的阻塞操作,它实际上可能不会立即取消?

在 asyncio 中不应该有“阻塞”操作,只有那些挂起的操作,并且应该立即取消它们。例外情况是附加到 asyncio with 的阻塞代码run_in_executor,其中底层操作根本不会取消,但 asyncio 协程将立即收到异常。

也许这仅仅取决于正在使用的底层操作或库是否会立即引发 CancelledError ?

库不会raise CancelledError,它在等待点接收它,在取消发生之前它恰好暂停。对于图书馆来说,取消的影响是await ...中断其等待并立即引发CancelledError。除非被捕获,否则异常将通过函数传播并await一直调用到顶级协程,该协程的引发将CancelledError整个任务标记为已取消。行为良好的 asyncio 代码将做到这一点,可能会用来finally释放它们所持有的操作系​​统级资源。当CancelledError被捕获时,代码可以选择不重新引发它,在这种情况下,取消实际上被忽略。

是否有可能loop.run_until_complete(或者实际上是对 的底层调用async.wait)由于超时以外的原因返回未完成的值?

如果您使用return_when=asyncio.ALL_COMPLETE(默认),那应该是不可能的。很有可能return_when=FIRST_COMPLETED,那么显然与超时无关。