Sza*_*lcs 8 python generator python-asyncio
假设我有一个像这样的异步生成器:
async def event_publisher(connection, queue):
while True:
if not await connection.is_disconnected():
event = await queue.get()
yield event
else:
return
Run Code Online (Sandbox Code Playgroud)
我是这样消费的:
published_events = event_publisher(connection, queue)
async for event in published_events:
# do event processing here
Run Code Online (Sandbox Code Playgroud)
它工作得很好,但是当连接断开并且没有发布新事件时,async for它将永远等待,所以理想情况下,我想像这样强行关闭生成器:
if connection.is_disconnected():
await published_events.aclose()
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
RuntimeError: aclose(): asynchronous generator is already running
有没有办法停止处理已经运行的发电机?
似乎与这个问题有关。值得注意:
如https://gist.github.com/1st1/d9860cbf6fe2e5d243e695809aea674c所示,在 迭代时关闭同步生成器是错误的。
...
在 3.8 中,调用“aclose()”可能会因 RuntimeError 而崩溃。不再可能可靠地取消正在运行的异步生成器。
好吧,既然我们不能取消正在运行的异步生成器,那么让我们尝试取消它的运行。
import asyncio
from contextlib import suppress
async def cancel_gen(agen):
task = asyncio.create_task(agen.__anext__())
task.cancel()
with suppress(asyncio.CancelledError):
await task
await agen.aclose() # probably a good idea,
# but if you'll be getting errors, try to comment this line
Run Code Online (Sandbox Code Playgroud)
...
if connection.is_disconnected():
await cancel_gen(published_events)
Run Code Online (Sandbox Code Playgroud)
由于您没有提供可重现的示例,因此无法测试它是否有效。