在协同程序完成时创建生成协程结果的生成器

Mar*_*ery 6 python async-await python-asyncio

目前,我有一个低效的同步生成器,它按顺序发出许多HTTP请求并产生结果.我想使用asyncioaiohttp并行化请求,从而加速这个生成器,但我想保持它作为一个普通的生成器(而不是PEP 525异步生成器),以便调用它的非异步代码不需要待修改.我怎样才能创建这样的发电机?

Mar*_*ery 11

asyncio.as_completed(),目前几乎没有记录,采用可迭代的协程或期货,并按输入期货完成的顺序返回可迭代的期货.通常,你会awaitasync函数内部循环其结果和成员...

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

async def main():
    for future in asyncio.as_completed([first(), second(), third()]):
        print(await future)

loop = asyncio.get_event_loop()

# Prints 'second', then 'third', then 'first'
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

...但是出于这个问题的目的,我们想要的是能够从普通的生成器中产生这些结果,这样正常的同步代码就可以消耗它们而不必知道async函数是在引擎盖下使用的.我们可以通过呼叫loop.run_until_complete()我们的as_completed电话所产生的期货来做到这一点......

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

def ordinary_generator():
    loop = asyncio.get_event_loop()
    for future in asyncio.as_completed([first(), second(), third()]):
        yield loop.run_until_complete(future)

# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
    print(element)
Run Code Online (Sandbox Code Playgroud)

这样一来,我们已经暴露了我们的异步代码,非异步土地在不需要调用者定义任何功能的方式async,或者即使知道ordinary_generator正在使用asyncio引擎盖下.

作为替代方案的实施ordinary_generator(),在某些情况下,提供了更多的灵活性,我们可以反复调用asyncio.wait()FIRST_COMPLETED标志,而不是遍历as_completed():

import concurrent.futures

def ordinary_generator():
    loop = asyncio.get_event_loop()
    pending = [first(), second(), third()]
    while pending:
        done, pending = loop.run_until_complete(
            asyncio.wait(
                pending,
                return_when=concurrent.futures.FIRST_COMPLETED
            )
        )
        for job in done:
            yield job.result()
Run Code Online (Sandbox Code Playgroud)

这种维护pending作业列表的方法具有以下优点:我们可以对其进行调整以便动态地将作业添加到pending列表中.这在我们的异步作业可以向队列中添加不可预测数量的其他作业的用例中非常有用 - 例如跟随其访问的每个页面上的所有链接的Web蜘蛛.