使用asyncio并行化生成器

kla*_*ann 4 python multithreading generator python-asyncio

我的应用程序从慢速i/o源读取数据,进行一些处理,然后将其写入本地文件.我用这样的生成器实现了这个:

import time

def io_task(x):
    print("requesting data for input %s" % x)
    time.sleep(1)   # this simulates a blocking I/O task
    return 2*x

def producer(xs):
    for x in xs:
        yield io_task(x)

def consumer(xs):
    with open('output.txt', 'w') as fp:
        for x in xs:
            print("writing %s" % x)
            fp.write(str(x) + '\n')

data = [1,2,3,4,5]
consumer(producer(data))
Run Code Online (Sandbox Code Playgroud)

现在我想在asyncio的帮助下并行完成这项任务,但我似乎无法弄清楚如何.对我来说,主要的问题是通过生成器直接从生产者向消费者提供数据,同时让asyncio发出多个并行请求io_task(x).而且,整体async def@asyncio.coroutine事情让我感到困惑.

有人可以告诉我如何构建一个使用asyncio此示例代码的最小工作示例吗?

(注意:只是调用,缓冲结果然后将它们写入文件是不行的io_task().我需要一个可以超出主存的大数据集的解决方案,这就是为什么我一直在使用生成器然而,假设消费者总是比所有生产者总和更快,这是安全的.

Vin*_*ent 6

从python 3.6和异步生成器开始,需要进行很少的更改才能使代码与asyncio兼容.

io_task函数成为一个协程:

async def io_task(x):
    await asyncio.sleep(1)
    return 2*x
Run Code Online (Sandbox Code Playgroud)

所述producer发电机变为异步发电机:

async def producer(xs):
    for x in xs:
        yield await io_task(x)
Run Code Online (Sandbox Code Playgroud)

consumer函数成为协程并使用aiofiles,异步上下文管理和异步迭代:

async def consumer(xs):
    async with aiofiles.open('output.txt', 'w') as fp:
        async for x in xs:
            await fp.write(str(x) + '\n')
Run Code Online (Sandbox Code Playgroud)

主协程在事件循环中运行:

data = [1,2,3,4,5]
main = consumer(producer(data))
loop = asyncio.get_event_loop()
loop.run_until_complete(main)
loop.close()
Run Code Online (Sandbox Code Playgroud)

此外,您可以考虑使用aiostream来管理生产者和消费者之间的一些处理操作.


编辑:使用as_completed可以在生产者端轻松地同时运行不同的I/O任务:

async def producer(xs):
    coros = [io_task(x) for x in xs]
    for future in asyncio.as_completed(coros):
        yield await future
Run Code Online (Sandbox Code Playgroud)

  • 嗨,关于这个例子,我有两个问题: 1. 如果 io_task 是一个 async_generator,那么在生产者中替换“yield await io_task(x)”是什么,以便它可以像“yield from”一样工作?2.如果我的数据非常大,不得不使用generator,我想避免构建一个超长的coros list,它也可以是一个generator吗? (2认同)