Python 异步生成器不是异步的

cjs*_*312 7 python asynchronous generator

我的代码如下。我希望两个 sleep 可以共享相同的时间范围并花费 1+2*3=7 秒来运行脚本。但似乎发生了一些错误,所以它仍然需要 3*(1+2) 秒。

有什么想法如何修改代码吗?

import asyncio

async def g():
    for i in range(3):
        await asyncio.sleep(1)
        yield i

async def main():
    async for x in g():
        print(x)
        await asyncio.sleep(2)

loop = asyncio.get_event_loop()
res = loop.run_until_complete(main())
loop.close()

Run Code Online (Sandbox Code Playgroud)

Mis*_*agi 6

async/await是交错的任务,而不是函数/发电机。例如,当您 时await asyncio.sleep(1),您当前的协程随着睡眠而延迟。类似地,a 将async for其协程延迟到下一项准备就绪。

为了运行单独的功能,您必须将每个部分创建为单独的任务。使用 aQueue在他们之间交换物品 - 任务只会被延迟,直到他们交换了物品。

from asyncio import Queue, sleep, run, gather


# the original async generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


async def producer(queue: Queue):
    async for i in g():
        print('send', i)
        await queue.put(i)  # resume once item is fetched
    await queue.put(None)


async def consumer(queue: Queue):
    x = await queue.get()  # resume once item is fetched
    while x is not None:
        print('got', x)
        await sleep(2)
        x = await queue.get()


async def main():
    queue = Queue()
    # tasks only share the queue
    await gather(
        producer(queue),
        consumer(queue),
    )


run(main())
Run Code Online (Sandbox Code Playgroud)

如果您经常需要此功能,您还可以将其放入包装异步可迭代对象的辅助对象中。helper 封装了队列和单独的任务。您可以直接在async for语句中的异步可迭代对象上应用帮助程序。

from asyncio import Queue, sleep, run, ensure_future


# helper to consume iterable as concurrent task
async def _enqueue_items(async_iterable, queue: Queue, sentinel):
    async for item in async_iterable:
        await queue.put(item)
    await queue.put(sentinel)


async def concurrent(async_iterable):
    """Concurrently fetch items from ``async_iterable``"""
    queue = Queue()
    sentinel = object()
    consumer = ensure_future(  # concurrently fetch items for the iterable
        _enqueue_items(async_iterable, queue, sentinel)
    )
    try:
        item = await queue.get()
        while item is not sentinel:
            yield item
            item = await queue.get()
    finally:
        consumer.cancel()


# the original generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


# the original main - modified with `concurrent`
async def main():
    async for x in concurrent(g()):
        print(x)
        await sleep(2)


run(main())
Run Code Online (Sandbox Code Playgroud)