相关疑难解决方法(0)

使用asyncio.Queue进行生产者-消费者流

我对如何使用asyncio.Queue特定的生产者-消费者模式感到困惑,在该模式中,生产者和消费者都可以同时独立运行。

首先,请考虑以下示例,该示例紧随docs中的asyncio.Queue示例:

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        sleep_for = await queue.get()
        await asyncio.sleep(sleep_for)
        queue.task_done()
        print(f'{name} has slept for {sleep_for:0.2f} seconds')

async def main(n):
    queue = asyncio.Queue()
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)
    tasks = []
    for i in range(n):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
    for task in tasks:
        task.cancel() …
Run Code Online (Sandbox Code Playgroud)

python python-3.x async-await python-asyncio

8
推荐指数
1
解决办法
5720
查看次数

使用asyncio并行化生成器

我的应用程序从慢速i/o源读取数据,进行一些处理,然后将其写入本地文件.我用这样的生成器实现了这个:

import time

def io_task(x):
    print("requesting data for input %s" % x)
    time.sleep(1)   # this simulates a blocking I/O task
    return 2*x

def producer(xs):
    for x in xs:
        yield io_task(x)

def consumer(xs):
    with open('output.txt', 'w') as fp:
        for x in xs:
            print("writing %s" % x)
            fp.write(str(x) + '\n')

data = [1,2,3,4,5]
consumer(producer(data))
Run Code Online (Sandbox Code Playgroud)

现在我想在asyncio的帮助下并行完成这项任务,但我似乎无法弄清楚如何.对我来说,主要的问题是通过生成器直接从生产者向消费者提供数据,同时让asyncio发出多个并行请求io_task(x).而且,整体async def@asyncio.coroutine事情让我感到困惑.

有人可以告诉我如何构建一个使用asyncio此示例代码的最小工作示例吗?

(注意:只是调用,缓冲结果然后将它们写入文件是不行的io_task().我需要一个可以超出主存的大数据集的解决方案,这就是为什么我一直在使用生成器然而,假设消费者总是比所有生产者总和更快,这是安全的.

python multithreading generator python-asyncio

4
推荐指数
1
解决办法
885
查看次数