如何在python 3.5+中将异步生成器合并为普通生成器

Max*_*ith 5 python multithreading asynchronous python-asyncio aiohttp

我在组合异步生成器并实际运行它们时遇到了麻烦。这是因为我发现运行它们的唯一方法是通过事件循环,该事件循环返回可迭代而不是生成器。让我用一个简单的例子说明一下:

假设我有一个函数google_search,它通过抓取来搜索google(我不是故意使用API​​)。它接收一个搜索字符串,并返回一个搜索结果生成器。当页面结束时,该生成器不会结束,该功能将继续到下一页。因此,google_search函数返回的可能是几乎无限的生成器(从技术上讲,它总是会终止,但在Google上进行搜索时,您通常会获得数百万次点击)

def google_search(search_string):
    # Basically uses requests/aiohttp and beautifulsoup
    # to parse the resulting html and yield search results
    # Assume this function works
    ......
Run Code Online (Sandbox Code Playgroud)

好的,现在我要制作一个函数,让我可以迭代多个google_search生成器。我想要这样的东西:

def google_searches(*search_strings):
    for results in zip(google_search(query) for query in search_strings):
        yield results
Run Code Online (Sandbox Code Playgroud)

这样,我可以使用简单的for循环来展开google_searches并获取结果。上面的代码效果很好,但是对于任何数量的搜索来说速度都很慢。该代码正在发送一个请求,以进行第一次搜索,然后进行第二次搜索,依此类推,直到最终产生结果。我想加快这个速度(很多)。我的第一个想法是将google_searches更改为异步函数(我正在使用python 3.6.3,并且可以使用await / async等)。然后,这将创建一个不错的异步生成器,但我只能在另一个异步函数或事件循环中运行它。使用run_until_complete(loop.gather(...))在事件循环中运行它会返回结果列表,而不是普通的生成器,这无法达到目的,因为可能有太多的搜索结果无法保存在列表中。

如何通过异步执行请求,同时使其仍然是普通生成器,来使google_searches函数更快(最好使用异步代码,但欢迎使用任何方法)?提前致谢!

Gus*_*rra 6

接受的答案会等待每个异步生成器的一个结果,然后再次调用生成器。如果数据的传输速度不一致,这可能是一个问题。下面的解决方案采用多个异步迭代(无论是否生成器),并在多个协程中同时迭代所有这些。每个协程将结果放入 a 中asyncio.Queue,然后由客户端代码迭代:

迭代器代码:

import asyncio
from async_timeout import timeout

class MergeAsyncIterator:
    def __init__(self, *it, timeout=60, maxsize=0):
        self._it = [self.iter_coro(i) for i in it]
        self.timeout = timeout
        self._futures = []
        self._queue = asyncio.Queue(maxsize=maxsize)

    def __aiter__(self):
        for it in self._it:
            f = asyncio.ensure_future(it)
            self._futures.append(f)
        return self

    async def __anext__(self):
        if all(f.done() for f in self._futures) and self._queue.empty():
            raise StopAsyncIteration
        with timeout(self.timeout):
            try:
                return await self._queue.get()
            except asyncio.CancelledError:
                raise StopAsyncIteration

    def iter_coro(self, it):
        if not hasattr(it, '__aiter__'):
            raise ValueError('Object passed must be an AsyncIterable')
        return self.aiter_to_queue(it)

    async def aiter_to_queue(self, ait):
        async for i in ait:
            await self._queue.put(i)
            await asyncio.sleep(0)
Run Code Online (Sandbox Code Playgroud)

客户端代码示例:

import random
import asyncio
from datetime import datetime

async def myaiter(name):
    for i in range(5):
        n = random.randint(0, 3)
        await asyncio.sleep(0.1 + n)
        yield (name, n)
    yield (name, 'DONE')

async def main():
    aiters = [myaiter(i) for i in 'abc']
    async for i in MergeAsyncIterator(*aiters, timeout=3):
        print(datetime.now().strftime('%H:%M:%S.%f'), i)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

输出:

14:48:28.638975 ('a', 1)
14:48:29.638822 ('b', 2)
14:48:29.741651 ('b', 0)
14:48:29.742013 ('a', 1)
14:48:30.639588 ('c', 3)
14:48:31.742705 ('c', 1)
14:48:31.847440 ('b', 2)
14:48:31.847828 ('a', 2)
14:48:31.847960 ('c', 0)
14:48:32.950166 ('c', 1)
14:48:33.948791 ('a', 2)
14:48:34.949339 ('b', 3)
14:48:35.055487 ('c', 2)
14:48:35.055928 ('c', 'DONE')
14:48:36.049977 ('a', 2)
14:48:36.050481 ('a', 'DONE')
14:48:37.050415 ('b', 2)
14:48:37.050966 ('b', 'DONE')
Run Code Online (Sandbox Code Playgroud)

PS:上面的代码使用了async_timeout第三方库。
PS2:该aiostream库的作用与上述代码相同,甚至更多。

  • 顺便说一下,这个解决方案大致相当于 `aiostream` [`merge`](http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.merge) 运算符 (3认同)
  • 惊人的!我仍然很惊讶没有更简单/更Python式的方法来做到这一点...... (2认同)

Mik*_*mov 5

def google_search(search_string):
    # Basically uses requests/aiohttp and beautifulsoup
Run Code Online (Sandbox Code Playgroud)

这是普通的同步发电机。您可以requests在其中使用,但如果您想使用异步aiohttp,则需要使用定义的异步生成器async def

迭代多个异步生成器更有趣。您不能使用 plain,zip因为它适用于普通可迭代对象,而不是异步可迭代对象。所以你应该实现你自己的(这也支持并发迭代)。

我做了一个小原型,我认为它可以满足您的需求:

import asyncio
import aiohttp
import time


# async versions of some builtins:
async def anext(aiterator):
    try:
        return await aiterator.__anext__()
    except StopAsyncIteration as exc:
        raise exc


def aiter(aiterable):
    return aiterable.__aiter__()


async def azip(*iterables):
    iterators = [aiter(it) for it in iterables]
    while iterators:
        results = await asyncio.gather(
            *[anext(it) for it in iterators],
            return_exceptions=True,
        )
        yield tuple(results)


# emulating grabbing:
async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def google_search(search_string):
    for i in range(999):  # big async generator
        url = 'http://httpbin.org/delay/{}'.format(i)  # increase delay to better see concurency
        j = await request(url)
        yield search_string + ' ' + str(i)


async def google_searches(*search_strings):
    async for results in azip(*[google_search(s) for s in search_strings]):
        for result in results:
            yield result


# test it works:
async def main():
    async for result in google_searches('first', 'second', 'third'):
        print(result, int(time.time()))


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()
Run Code Online (Sandbox Code Playgroud)

输出:

first 0 1514759561
second 0 1514759561
third 0 1514759561
first 1 1514759562
second 1 1514759562
third 1 1514759562
first 2 1514759564
second 2 1514759564
third 2 1514759564
first 3 1514759567
second 3 1514759567
third 3 1514759567
Run Code Online (Sandbox Code Playgroud)

时间表明不同的搜索同时运行。

  • 顺便说一下,这个解决方案大致相当于 `aiostream` [`zip`](http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.zip) 运算符 (2认同)