从事件循环中产生异步生成器数据可能吗?

gwt*_*wod 5 python python-asyncio httpx

我想使用httpx从协程内的多个同时发生的 HTTP 流请求中读取数据,并将数据返回给运行事件循环的非异步函数,而不仅仅是返回最终数据。

但是如果我让我的异步函数产生而不是返回,我会收到抱怨asyncio.as_completed()loop.run_until_complete()期望一个协程或一个 Future,而不是一个异步生成器。

因此,我可以让它工作的唯一方法是收集每个协程内的所有流数据,一旦请求完成就返回所有数据。然后收集所有协程结果,最后将其返回给非异步调用函数。

这意味着我必须将所有内容都保存在内存中,并等到最慢的请求完成后才能获取所有数据,这与流式传输 http 请求的全部意义相悖。

有什么办法可以完成这样的事情吗?我目前的愚蠢实现是这样的:

def collect_data(urls):
    """Non-async function wishing it was a non-async generator"""

    async def stream(async_client, url, payload):
        data = []
        async with async_client.stream("GET", url=url) as ar:
            ar.raise_for_status()
            async for line in ar.aiter_lines():
                data.append(line)
                # would like to yield each line here
        return data

    async def execute_tasks(urls):
        all_data = []
        async with httpx.AsyncClient() as async_client:
            tasks = [stream(async_client, url) for url in urls]
            for coroutine in asyncio.as_completed(tasks):
                all_data += await coroutine
                # would like to iterate and yield each line here
        return all_events

    try:
        loop = asyncio.get_event_loop()
        data = loop.run_until_complete(execute_tasks(urls=urls))
        return data
        # would like to iterate and yield the data here as it becomes available
    finally:
        loop.close()
Run Code Online (Sandbox Code Playgroud)

编辑:我也尝试了一些使用asyncio.Queuetrio内存通道的解决方案,但由于我只能从异步范围内的那些中读取,因此我无法更接近解决方案

编辑 2:我想从非异步生成器中使用它的原因是我想从使用 Django Rest Framework 流 API 的 Django 应用程序中使用它。

use*_*342 5

通常你应该只做collect_data异步,并在整个过程中使用异步代码——这就是 asyncio 的设计用途。但如果由于某种原因不可行,您可以通过应用一些胶水代码手动迭代异步迭代器:

def iter_over_async(ait, loop):
    ait = ait.__aiter__()
    async def get_next():
        try:
            obj = await ait.__anext__()
            return False, obj
        except StopAsyncIteration:
            return True, None
    while True:
        done, obj = loop.run_until_complete(get_next())
        if done:
            break
        yield obj
Run Code Online (Sandbox Code Playgroud)

上面的工作方式是提供一个异步闭包,它使用__anext__魔法方法不断从异步迭代器中检索值,并在对象到达时返回它们。这个异步闭包是run_until_complete()在普通同步生成器内的循环中调用的。(闭包实际上返回一对 done 指示符和实际对象,以避免StopAsyncIteration通过传播run_until_complete,这可能不受支持。)

有了这个,您可以制作execute_tasks一个异步生成器(async defwith yield)并使用以下方法对其进行迭代:

for chunk in iter_over_async(execute_tasks(urls), loop):
    ...
Run Code Online (Sandbox Code Playgroud)

请注意,此方法与 不兼容asyncio.run,并且可能会在以后导致问题。

  • aiostream 很好用。:) 请注意,您的代码可以在 Python 3.7 及更高版本中正常工作,`loop.run_until_complete()` 不会去任何地方。只是一般建议正在转向“asyncio.run”,因此在某些时候您的设计可能会落后,您可能需要重新考虑它。 (2认同)