kla*_*ann 4 python multithreading generator python-asyncio
我的应用程序从慢速i/o源读取数据,进行一些处理,然后将其写入本地文件.我用这样的生成器实现了这个:
import time
def io_task(x):
print("requesting data for input %s" % x)
time.sleep(1) # this simulates a blocking I/O task
return 2*x
def producer(xs):
for x in xs:
yield io_task(x)
def consumer(xs):
with open('output.txt', 'w') as fp:
for x in xs:
print("writing %s" % x)
fp.write(str(x) + '\n')
data = [1,2,3,4,5]
consumer(producer(data))
Run Code Online (Sandbox Code Playgroud)
现在我想在asyncio的帮助下并行完成这项任务,但我似乎无法弄清楚如何.对我来说,主要的问题是通过生成器直接从生产者向消费者提供数据,同时让asyncio发出多个并行请求io_task(x).而且,整体async def与@asyncio.coroutine事情让我感到困惑.
有人可以告诉我如何构建一个使用asyncio此示例代码的最小工作示例吗?
(注意:只是调用,缓冲结果然后将它们写入文件是不行的io_task().我需要一个可以超出主存的大数据集的解决方案,这就是为什么我一直在使用生成器然而,假设消费者总是比所有生产者总和更快,这是安全的.
从python 3.6和异步生成器开始,需要进行很少的更改才能使代码与asyncio兼容.
该io_task函数成为一个协程:
async def io_task(x):
await asyncio.sleep(1)
return 2*x
Run Code Online (Sandbox Code Playgroud)
所述producer发电机变为异步发电机:
async def producer(xs):
for x in xs:
yield await io_task(x)
Run Code Online (Sandbox Code Playgroud)
该consumer函数成为协程并使用aiofiles,异步上下文管理和异步迭代:
async def consumer(xs):
async with aiofiles.open('output.txt', 'w') as fp:
async for x in xs:
await fp.write(str(x) + '\n')
Run Code Online (Sandbox Code Playgroud)
主协程在事件循环中运行:
data = [1,2,3,4,5]
main = consumer(producer(data))
loop = asyncio.get_event_loop()
loop.run_until_complete(main)
loop.close()
Run Code Online (Sandbox Code Playgroud)
此外,您可以考虑使用aiostream来管理生产者和消费者之间的一些处理操作.
编辑:使用as_completed可以在生产者端轻松地同时运行不同的I/O任务:
async def producer(xs):
coros = [io_task(x) for x in xs]
for future in asyncio.as_completed(coros):
yield await future
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
885 次 |
| 最近记录: |