如何加速 Python 中的异步请求

Ami*_*mar 8 python asynchronous python-3.x python-asyncio aiohttp

我想从网站下载/抓取 5000 万条日志记录。我没有一次性下载 5000 万个,而是尝试使用以下代码一次下载 1000 万个,但它一次只能处理 20,000 个(超过这个数量会引发错误),因此它变得非常耗时下载那么多数据。目前下载20000条记录的速度需要3-4分钟,100%|\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88| 20000/20000 [03:48<00:00, 87.41it/s]那么如何加速呢?

\n
import asyncio\nimport aiohttp\nimport time\nimport tqdm\nimport nest_asyncio\n\nnest_asyncio.apply()\n\n\nasync def make_numbers(numbers, _numbers):\n    for i in range(numbers, _numbers):\n        yield i\n\n\nn = 0\nq = 10000000\n\n\nasync def fetch():\n    # example\n    url = "https://httpbin.org/anything/log?id="\n\n    async with aiohttp.ClientSession() as session:\n        post_tasks = []\n        # prepare the coroutines that poat\n        async for x in make_numbers(n, q):\n            post_tasks.append(do_get(session, url, x))\n        # now execute them all at once\n\n        responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]\n\n\nasync def do_get(session, url, x):\n    headers = {\n        'Content-Type': "application/x-www-form-urlencoded",\n        'Access-Control-Allow-Origin': "*",\n        'Accept-Encoding': "gzip, deflate",\n        'Accept-Language': "en-US"\n    }\n\n    async with session.get(url + str(x), headers=headers) as response:\n        data = await response.text()\n        print(data)\n\n\ns = time.perf_counter()\ntry:\n    loop = asyncio.get_event_loop()\n    loop.run_until_complete(fetch())\nexcept:\n    print("error")\n\nelapsed = time.perf_counter() - s\n# print(f"{__file__} executed in {elapsed:0.2f} seconds.")\n
Run Code Online (Sandbox Code Playgroud)\n

回溯(最近一次调用最后一次):

\n
File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 986, in _wrap_create_connection\n    return await self._loop.create_connection(*args, **kwargs)  # type: ignore[return-value]  # noqa\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\base_events.py", line 1056, in create_connection\n    raise exceptions[0]\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\base_events.py", line 1041, in create_connection\n    sock = await self._connect_sock(\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\base_events.py", line 955, in _connect_sock\n    await self.sock_connect(sock, address)\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\proactor_events.py", line 702, in sock_connect\n    return await self._proactor.connect(sock, address)\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\tasks.py", line 328, in __wakeup\n    future.result()\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\windows_events.py", line 812, in _poll\n    value = callback(transferred, key, ov)\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\windows_events.py", line 599, in finish_connect\n    ov.getresult()\nOSError: [WinError 121] The semaphore timeout period has expired\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n  File "C:\\Users\\SGM\\Desktop\\xnet\\x3stackoverflow.py", line 136, in <module>\n    loop.run_until_complete(fetch())\n  File "C:\\Users\\SGM\\AppData\\Roaming\\Python\\Python39\\site-packages\\nest_asyncio.py", line 81, in run_until_complete\n    return f.result()\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\futures.py", line 201, in result\n    raise self._exception\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\tasks.py", line 256, in __step\n    result = coro.send(None)\n  File "C:\\Users\\SGM\\Desktop\\xnet\\x3stackoverflow.py", line 88, in fetch\n    response = await f\n  File "C:\\Users\\SGM\\Desktop\\xnet\\x3stackoverflow.py", line 37, in _wait_for_one\n    return f.result()\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\futures.py", line 201, in result\n    raise self._exception\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\tasks.py", line 258, in __step\n    result = coro.throw(exc)\n  File "C:\\Users\\SGM\\Desktop\\xnet\\x3stackoverflow.py", line 125, in do_get\n    async with session.get(url + str(x), headers=headers) as response:\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\client.py", line 1138, in __aenter__\n    self._resp = await self._coro\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\client.py", line 535, in _request\n    conn = await self._connector.connect(\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 542, in connect\n    proto = await self._create_connection(req, traces, timeout)\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 907, in _create_connection\n    _, proto = await self._create_direct_connection(req, traces, timeout)\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 1206, in _create_direct_connection\n    raise last_exc\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 1175, in _create_direct_connection\n    transp, proto = await self._wrap_create_connection(\n  File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 992, in _wrap_create_connection\n    raise client_error(req.connection_key, exc) from exc\naiohttp.client_exceptions.ClientConnectorError: Cannot connect to host example.com:80 ssl:default [The semaphore timeout period has expired]\n
Run Code Online (Sandbox Code Playgroud)\n

aar*_*ron 15

瓶颈:同时连接数

首先,瓶颈是 TCP 连接器中同时连接的总数。

默认值aiohttp.TCPConnectorlimit=100. 在大多数系统上(在 macOS 上测试),您应该能够通过传递connectorwith来将其加倍limit=200

# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:
Run Code Online (Sandbox Code Playgroud)

所花费的时间应该会显着减少。(在 macOS 上:q = 20_000从 58 秒减少到 33 秒,减少了 43%,q = 10_000从 31 秒减少到 18 秒,减少了 42%。)

您可以配置的数量limit取决于您的计算机可以打开的文件描述符的数量。(在 macOS 上:可以运行ulimit -n查看,将ulimit -n 1024当前终端会话增加到 1024,然后更改为limit=1000。相比limit=100q = 20_000减少了 76% 至 14 秒,q = 10_000减少了 71% 至 9 秒。)

支持 5000 万个请求:异步生成器

接下来,5000 万个请求看似挂起的原因仅仅是因为其数量庞大。

仅创建 1000 万个协程就post_tasks需要 68-98 秒(在我的机器上差异很大),然后事件循环进一步承受如此多的任务,其中 99.99% 被 TCP 连接池阻塞。

我们可以使用异步生成器推迟协程的创建:

async def make_async_gen(f, n, q):
    async for x in make_numbers(n, q):
        yield f(x)
Run Code Online (Sandbox Code Playgroud)

我们需要一个对应的对象来asyncio.as_completed()处理async_genand concurrency

from asyncio import ensure_future, events
from asyncio.queues import Queue

def as_completed_for_async_gen(fs_async_gen, concurrency):
    done = Queue()
    loop = events.get_event_loop()
    # todo = {ensure_future(f, loop=loop) for f in set(fs)}  # -
    todo = set()                                             # +

    def _on_completion(f):
        todo.remove(f)
        done.put_nowait(f)
        loop.create_task(_add_next())  # +

    async def _wait_for_one():
        f = await done.get()
        return f.result()

    async def _add_next():  # +
        try:
            f = await fs_async_gen.__anext__()
        except StopAsyncIteration:
            return
        f = ensure_future(f, loop=loop)
        f.add_done_callback(_on_completion)
        todo.add(f)

    # for f in todo:                           # -
    #     f.add_done_callback(_on_completion)  # -
    # for _ in range(len(todo)):               # -
    #     yield _wait_for_one()                # -
    for _ in range(concurrency):               # +
        loop.run_until_complete(_add_next())   # +
    while todo:                                # +
        yield _wait_for_one()                  # +
Run Code Online (Sandbox Code Playgroud)

然后,我们更新fetch()

from functools import partial

CONCURRENCY = 200  # +

n = 0
q = 50_000_000

async def fetch():
    # example
    url = "https://httpbin.org/anything/log?id="

    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
        # post_tasks = []                                                # -
        # # prepare the coroutines that post                             # -
        # async for x in make_numbers(n, q):                             # -
        #     post_tasks.append(do_get(session, url, x))                 # -
        # Prepare the coroutines generator                               # +
        async_gen = make_async_gen(partial(do_get, session, url), n, q)  # +

        # now execute them all at once                                                                         # -
        # responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))]     # -
        # Now execute them with a specified concurrency                                                        # +
        responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]  # +
Run Code Online (Sandbox Code Playgroud)

其他限制

通过以上内容,程序可以开始处理 5000 万个请求,但是:

  1. CONCURRENCY = 1000根据 的估计,仍需要 8 小时左右tqdm
  2. 您的程序可能会耗尽内存responses并崩溃。

对于第 2 点,您可能应该这样做:

# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
    response = await f
    
    # Do something with response, such as writing to a local file
    # ...
Run Code Online (Sandbox Code Playgroud)

代码中有错误

do_get()应该return data

async def do_get(session, url, x):
    headers = {
        'Content-Type': "application/x-www-form-urlencoded",
        'Access-Control-Allow-Origin': "*",
        'Accept-Encoding': "gzip, deflate",
        'Accept-Language': "en-US"
    }

    async with session.get(url + str(x), headers=headers) as response:
        data = await response.text()
        # print(data)  # -
        return data    # +
Run Code Online (Sandbox Code Playgroud)