使用asyncio.Queue进行生产者-消费者流

Bra*_*mon 8 python python-3.x async-await python-asyncio

我对如何使用asyncio.Queue特定的生产者-消费者模式感到困惑,在该模式中,生产者和消费者都可以同时独立运行。

首先,请考虑以下示例,该示例紧随docs中的asyncio.Queue示例:

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        sleep_for = await queue.get()
        await asyncio.sleep(sleep_for)
        queue.task_done()
        print(f'{name} has slept for {sleep_for:0.2f} seconds')

async def main(n):
    queue = asyncio.Queue()
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)
    tasks = []
    for i in range(n):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)
    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

if __name__ == '__main__':
    import sys
    n = 3 if len(sys.argv) == 1 else sys.argv[1]
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

关于此脚本,有一个更详细的细节:queue.put_nowait(sleep_for)通过常规的for循环将项目同步放入队列。

我的目标是创建一个使用async def worker()(或consumer())和的脚本async def producer()。两者都应安排为同时运行。没有一个消费者协程明确地与生产者绑定或链接。

我如何修改上面的程序,以便生产者是可以与消费者/工人同时调度的协程?


PYMOTW还有另一个例子。它要求生产者提前知道消费者的数量,None并向消费者发出生产已经完成的信号。

use*_*342 13

我如何修改上面的程序,以便生产者是可以与消费者/工人同时调度的协程?

可以对示例进行概括,而无需更改其基本逻辑:

  • 将插入循环移到单独的生产者协程。
  • 在后台启动消费者,让他们处理生产的商品。
  • 等待生产者完成操作await,例如使用await producer()await gather(*producers),等等。
  • 一旦所有生产者都完成了,等待剩余的生产项目与 await queue.join()
  • 取消消费者,所有消费者现在都在闲着等待下一个永远不会到达的排队的商品。

这是实现上述内容的示例:

import asyncio, random, time

async def rnd_sleep(t):
    # sleep for T seconds on average
    await asyncio.sleep(t * random.random() * 2)

async def producer(queue):
    while True:
        token = random.random()
        print(f'produced {token}')
        if token < .05:
            break
        await queue.put(token)
        await rnd_sleep(.1)

async def consumer(queue):
    while True:
        token = await queue.get()
        await rnd_sleep(.3)
        queue.task_done()
        print(f'consumed {token}')

async def main():
    queue = asyncio.Queue()

    # fire up the both producers and consumers
    producers = [asyncio.create_task(producer(queue))
                 for _ in range(3)]
    consumers = [asyncio.create_task(consumer(queue))
                 for _ in range(10)]

    # with both producers and consumers running, wait for
    # the producers to finish
    await asyncio.gather(*producers)
    print('---- done producing')

    # wait for the remaining tasks to be processed
    await queue.join()

    # cancel the consumers, which are now idle
    for c in consumers:
        c.cancel()

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

  • 我正在用 `aiohttp` 和 [`aiofiles`](https://github.com/Tinche/aiofiles) 写关于 asyncio 并且想在一个部分提到队列——你介意我链接到并引用这个吗回答? (11认同)
  • @BradSolomon 当然,继续! (6认同)
  • @pylang 如果您的代码受 CPU 限制(或以其他未由 asyncio 处理的方式阻塞),则 asyncio 不会自动交错它。在这种情况下,使用 [`run_in_executor`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) 将阻塞代码卸载到线程池。然后你会写`if await loop.run_in_executor(None, lambda: pathlib.Path(...).exists()): ...` (3认同)
  • 我正在尝试调整它来测试文件是否存在于目录中,但它似乎并没有异步地交错生产者和消费者。所有的生产者首先被生成,其次才是消费者。我如何修改它以在 cpu-bound 进程上工作,例如 `if pathlib.Path().exists(): ...`。 (2认同)
  • @CpILL当然,你可以这样做(这就是“假设异常是可恢复的”所指的),只要记住无论如何也要调用“task_done”,以便当前的出队被视为已解决。队列不会介意你读取了同一个对象,它只是计算入队和出队的数量。 (2认同)