如何强行关闭异步生成器?

Sza*_*lcs 8 python generator python-asyncio

假设我有一个像这样的异步生成器:

async def event_publisher(connection, queue):
    while True:
        if not await connection.is_disconnected():
            event = await queue.get()
            yield event
        else:
            return
Run Code Online (Sandbox Code Playgroud)

我是这样消费的:

published_events = event_publisher(connection, queue)
async for event in published_events:
    # do event processing here
Run Code Online (Sandbox Code Playgroud)

它工作得很好,但是当连接断开并且没有发布新事件时,async for它将永远等待,所以理想情况下,我想像这样强行关闭生成器:

if connection.is_disconnected():
    await published_events.aclose()
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

RuntimeError: aclose(): asynchronous generator is already running

有没有办法停止处理已经运行的发电机?

Mik*_*mov 6

似乎与这个问题有关。值得注意:

https://gist.github.com/1st1/d9860cbf6fe2e5d243e695809aea674c所示,在 迭代时关闭同步生成器是错误的。

...

在 3.8 中,调用“aclose()”可能会因 RuntimeError 而崩溃。不再可能可靠地取消正在运行的异步生成器。

好吧,既然我们不能取消正在运行的异步生成器,那么让我们尝试取消它的运行。

import asyncio
from contextlib import suppress


async def cancel_gen(agen):
    task = asyncio.create_task(agen.__anext__())
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task
    await agen.aclose()  # probably a good idea, 
                         # but if you'll be getting errors, try to comment this line
Run Code Online (Sandbox Code Playgroud)

...

if connection.is_disconnected():
    await cancel_gen(published_events)
Run Code Online (Sandbox Code Playgroud)

由于您没有提供可重现的示例,因此无法测试它是否有效。