Chr*_*hes 5 python python-asyncio
我有一个asyncio.Condition名为cond. 我想等它,但只能等那么久才放弃。由于asyncio.Condition.wait不需要超时,因此无法直接完成此操作。应该用于包装并提供超时的文档状态asyncio.wait_for:
asyncio.wait_for() 函数可用于在超时后取消任务。
因此,我们得出以下解决方案:
async def coro():
print("Taking lock...")
async with cond:
print("Lock acquired.")
print("Waiting!")
await asyncio.wait_for(cond.wait(), timeout=999)
print("Was notified!")
print("Lock released.")
Run Code Online (Sandbox Code Playgroud)
现在假设它coro自己在运行后五秒被取消。这将引发CancelledError的wait_for,其中取消cond.wait前再上调错误。然后错误传播到coro,由于async with块,它隐式地尝试释放 中的锁cond。但是,当前未持有该锁;cond.wait已被取消,但没有机会处理该取消并重新获取锁。因此,我们得到一个丑陋的异常,如下所示:
Taking lock...
Lock acquired.
Waiting!
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at [REDACTED]> exception=RuntimeError('Lock is not acquired.',)>
Traceback (most recent call last):
[REDACTED], in coro
await asyncio.wait_for(cond.wait(), timeout=999)
[REDACTED], in wait_for
yield from waiter
concurrent.futures._base.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
[REDACTED], in coro
print("Was notified!")
[REDACTED], in coro
res = func(*args, **kw)
[REDACTED], in __aexit__
self.release()
[REDACTED], in release
raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.
Run Code Online (Sandbox Code Playgroud)
换句话说,在处理 时CancelledError,从试图释放未持有的锁中coro引发了RuntimeError。堆栈跟踪显示该print("Was notified!")行的原因是因为这是违规async with块的最后一行。
在编写这个问题并进一步调查时,我在 Python 错误跟踪器上偶然发现了类似的问题,最终检查了asyncio源代码,并确定这实际上是一个错误asyncio本身。
我已将其提交给问题跟踪器,供遇到相同问题的人使用,并使用我创建的解决方法回答了我自己的问题。
编辑:根据 ParkerD 的要求,这里是产生上述问题的完整可运行示例:
编辑 2:更新示例以使用Python 3.7+的新功能asyncio.run和asyncio.create_task功能
import asyncio
async def coro():
cond = asyncio.Condition()
print("Taking lock...")
async with cond:
print("Lock acquired.")
print("Waiting!")
await asyncio.wait_for(cond.wait(), timeout=999)
print("Was notified!")
print("Lock released.")
async def cancel_after_5(c):
task = asyncio.create_task(c)
await asyncio.sleep(5)
task.cancel()
await asyncio.wait([task])
asyncio.run(cancel_after_5(coro()))
Run Code Online (Sandbox Code Playgroud)
正如问题末尾所述,我确定该问题实际上是库中的错误。我将重申该错误的问题跟踪器位于此处,并介绍我的解决方法。
以下函数基于wait_for其自身(来源此处),并且是专门用于等待条件的版本,并额外保证取消它是安全的。
调用wait_on_condition_with_timeout(cond, timeout)大致相当于asyncio.wait_for(cond.wait(), timeout)。
async def wait_on_condition_with_timeout(condition: asyncio.Condition, timeout: float) -> bool:
loop = asyncio.get_event_loop()
# Create a future that will be triggered by either completion or timeout.
waiter = loop.create_future()
# Callback to trigger the future. The varargs are there to consume and void any arguments passed.
# This allows the same callback to be used in loop.call_later and wait_task.add_done_callback,
# which automatically passes the finished future in.
def release_waiter(*_):
if not waiter.done():
waiter.set_result(None)
# Set up the timeout
timeout_handle = loop.call_later(timeout, release_waiter)
# Launch the wait task
wait_task = loop.create_task(condition.wait())
wait_task.add_done_callback(release_waiter)
try:
await waiter # Returns on wait complete or timeout
if wait_task.done():
return True
else:
raise asyncio.TimeoutError()
except (asyncio.TimeoutError, asyncio.CancelledError):
# If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
# then re-raise.
wait_task.remove_done_callback(release_waiter)
wait_task.cancel()
await asyncio.wait([wait_task])
raise
finally:
timeout_handle.cancel()
Run Code Online (Sandbox Code Playgroud)
关键部分是,如果发生超时或取消,该方法将等待条件重新获取锁,然后再重新引发异常:
except (asyncio.TimeoutError, asyncio.CancelledError):
# If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
# then re-raise.
wait_task.remove_done_callback(release_waiter)
wait_task.cancel()
await asyncio.wait([wait_task]) # This line is missing from the real wait_for
raise
Run Code Online (Sandbox Code Playgroud)
我已经在 Python 3.6.9 上对此进行了测试,它运行得很好。3.7 和 3.8 中也存在同样的错误,所以我想它对这些版本也很有用。如果您想知道错误何时会得到修复,请查看上面的问题跟踪器。如果您想要 s 以外的版本Condition,则更改参数和行应该很简单create_task。