Max*_*ith 5 python multithreading asynchronous python-asyncio aiohttp
我在组合异步生成器并实际运行它们时遇到了麻烦。这是因为我发现运行它们的唯一方法是通过事件循环,该事件循环返回可迭代而不是生成器。让我用一个简单的例子说明一下:
假设我有一个函数google_search,它通过抓取来搜索google(我不是故意使用API)。它接收一个搜索字符串,并返回一个搜索结果生成器。当页面结束时,该生成器不会结束,该功能将继续到下一页。因此,google_search函数返回的可能是几乎无限的生成器(从技术上讲,它总是会终止,但在Google上进行搜索时,您通常会获得数百万次点击)
def google_search(search_string):
# Basically uses requests/aiohttp and beautifulsoup
# to parse the resulting html and yield search results
# Assume this function works
......
Run Code Online (Sandbox Code Playgroud)
好的,现在我要制作一个函数,让我可以迭代多个google_search生成器。我想要这样的东西:
def google_searches(*search_strings):
for results in zip(google_search(query) for query in search_strings):
yield results
Run Code Online (Sandbox Code Playgroud)
这样,我可以使用简单的for循环来展开google_searches并获取结果。上面的代码效果很好,但是对于任何数量的搜索来说速度都很慢。该代码正在发送一个请求,以进行第一次搜索,然后进行第二次搜索,依此类推,直到最终产生结果。我想加快这个速度(很多)。我的第一个想法是将google_searches更改为异步函数(我正在使用python 3.6.3,并且可以使用await / async等)。然后,这将创建一个不错的异步生成器,但我只能在另一个异步函数或事件循环中运行它。使用run_until_complete(loop.gather(...))在事件循环中运行它会返回结果列表,而不是普通的生成器,这无法达到目的,因为可能有太多的搜索结果无法保存在列表中。
如何通过异步执行请求,同时使其仍然是普通生成器,来使google_searches函数更快(最好使用异步代码,但欢迎使用任何方法)?提前致谢!
接受的答案会等待每个异步生成器的一个结果,然后再次调用生成器。如果数据的传输速度不一致,这可能是一个问题。下面的解决方案采用多个异步迭代(无论是否生成器),并在多个协程中同时迭代所有这些。每个协程将结果放入 a 中asyncio.Queue,然后由客户端代码迭代:
迭代器代码:
import asyncio
from async_timeout import timeout
class MergeAsyncIterator:
def __init__(self, *it, timeout=60, maxsize=0):
self._it = [self.iter_coro(i) for i in it]
self.timeout = timeout
self._futures = []
self._queue = asyncio.Queue(maxsize=maxsize)
def __aiter__(self):
for it in self._it:
f = asyncio.ensure_future(it)
self._futures.append(f)
return self
async def __anext__(self):
if all(f.done() for f in self._futures) and self._queue.empty():
raise StopAsyncIteration
with timeout(self.timeout):
try:
return await self._queue.get()
except asyncio.CancelledError:
raise StopAsyncIteration
def iter_coro(self, it):
if not hasattr(it, '__aiter__'):
raise ValueError('Object passed must be an AsyncIterable')
return self.aiter_to_queue(it)
async def aiter_to_queue(self, ait):
async for i in ait:
await self._queue.put(i)
await asyncio.sleep(0)
Run Code Online (Sandbox Code Playgroud)
客户端代码示例:
import random
import asyncio
from datetime import datetime
async def myaiter(name):
for i in range(5):
n = random.randint(0, 3)
await asyncio.sleep(0.1 + n)
yield (name, n)
yield (name, 'DONE')
async def main():
aiters = [myaiter(i) for i in 'abc']
async for i in MergeAsyncIterator(*aiters, timeout=3):
print(datetime.now().strftime('%H:%M:%S.%f'), i)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
输出:
14:48:28.638975 ('a', 1)
14:48:29.638822 ('b', 2)
14:48:29.741651 ('b', 0)
14:48:29.742013 ('a', 1)
14:48:30.639588 ('c', 3)
14:48:31.742705 ('c', 1)
14:48:31.847440 ('b', 2)
14:48:31.847828 ('a', 2)
14:48:31.847960 ('c', 0)
14:48:32.950166 ('c', 1)
14:48:33.948791 ('a', 2)
14:48:34.949339 ('b', 3)
14:48:35.055487 ('c', 2)
14:48:35.055928 ('c', 'DONE')
14:48:36.049977 ('a', 2)
14:48:36.050481 ('a', 'DONE')
14:48:37.050415 ('b', 2)
14:48:37.050966 ('b', 'DONE')
Run Code Online (Sandbox Code Playgroud)
PS:上面的代码使用了async_timeout第三方库。
PS2:该aiostream库的作用与上述代码相同,甚至更多。
def google_search(search_string):
# Basically uses requests/aiohttp and beautifulsoup
Run Code Online (Sandbox Code Playgroud)
这是普通的同步发电机。您可以requests在其中使用,但如果您想使用异步aiohttp,则需要使用定义的异步生成器async def。
迭代多个异步生成器更有趣。您不能使用 plain,zip因为它适用于普通可迭代对象,而不是异步可迭代对象。所以你应该实现你自己的(这也支持并发迭代)。
我做了一个小原型,我认为它可以满足您的需求:
import asyncio
import aiohttp
import time
# async versions of some builtins:
async def anext(aiterator):
try:
return await aiterator.__anext__()
except StopAsyncIteration as exc:
raise exc
def aiter(aiterable):
return aiterable.__aiter__()
async def azip(*iterables):
iterators = [aiter(it) for it in iterables]
while iterators:
results = await asyncio.gather(
*[anext(it) for it in iterators],
return_exceptions=True,
)
yield tuple(results)
# emulating grabbing:
async def request(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
async def google_search(search_string):
for i in range(999): # big async generator
url = 'http://httpbin.org/delay/{}'.format(i) # increase delay to better see concurency
j = await request(url)
yield search_string + ' ' + str(i)
async def google_searches(*search_strings):
async for results in azip(*[google_search(s) for s in search_strings]):
for result in results:
yield result
# test it works:
async def main():
async for result in google_searches('first', 'second', 'third'):
print(result, int(time.time()))
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
Run Code Online (Sandbox Code Playgroud)
输出:
first 0 1514759561
second 0 1514759561
third 0 1514759561
first 1 1514759562
second 1 1514759562
third 1 1514759562
first 2 1514759564
second 2 1514759564
third 2 1514759564
first 3 1514759567
second 3 1514759567
third 3 1514759567
Run Code Online (Sandbox Code Playgroud)
时间表明不同的搜索同时运行。