我对如何使用asyncio.Queue特定的生产者-消费者模式感到困惑,在该模式中,生产者和消费者都可以同时独立运行。
首先,请考虑以下示例,该示例紧随docs中的asyncio.Queue示例:
import asyncio
import random
import time
async def worker(name, queue):
while True:
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'{name} has slept for {sleep_for:0.2f} seconds')
async def main(n):
queue = asyncio.Queue()
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
tasks = []
for i in range(n):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
for task in tasks:
task.cancel() …Run Code Online (Sandbox Code Playgroud) 我的应用程序从慢速i/o源读取数据,进行一些处理,然后将其写入本地文件.我用这样的生成器实现了这个:
import time
def io_task(x):
print("requesting data for input %s" % x)
time.sleep(1) # this simulates a blocking I/O task
return 2*x
def producer(xs):
for x in xs:
yield io_task(x)
def consumer(xs):
with open('output.txt', 'w') as fp:
for x in xs:
print("writing %s" % x)
fp.write(str(x) + '\n')
data = [1,2,3,4,5]
consumer(producer(data))
Run Code Online (Sandbox Code Playgroud)
现在我想在asyncio的帮助下并行完成这项任务,但我似乎无法弄清楚如何.对我来说,主要的问题是通过生成器直接从生产者向消费者提供数据,同时让asyncio发出多个并行请求io_task(x).而且,整体async def与@asyncio.coroutine事情让我感到困惑.
有人可以告诉我如何构建一个使用asyncio此示例代码的最小工作示例吗?
(注意:只是调用,缓冲结果然后将它们写入文件是不行的io_task().我需要一个可以超出主存的大数据集的解决方案,这就是为什么我一直在使用生成器然而,假设消费者总是比所有生产者总和更快,这是安全的.