aiohttp client_exception ServerDisconnectedError - 这是 API 服务器的问题还是 aiohttp 或我的代码?

hyp*_*hen 5 python-3.x python-asyncio aiohttp

每当我对使用 asyncio 和 aiohttp 访问的 API 执行超过 200 个请求时,我都会收到 aiohttp client_exception.ServerDisconnectedError。它似乎不是我的代码,因为它与较少数量的请求一致地工作,但在任何较大数量上都失败。试图了解此错误是否与 aiohttp、我的代码或 API 端点本身有关?网上似乎没有太多关于这个的信息。

  Traceback (most recent call last):
  File "C:/usr/PycharmProjects/api_framework/api_framework.py", line 27, in <module>
    stuff = abc.do_stuff_2()
  File "C:\usr\PycharmProjects\api_framework\api\abc\abc.py", line 72, in do_stuff
    self.queue_manager(self.do_stuff(json_data))
  File "C:\usr\PycharmProjects\api_framework\api\abc\abc.py", line 115, in queue_manager
    loop.run_until_complete(future)
  File "C:\Python36x64\lib\asyncio\base_events.py", line 466, in run_until_complete
    return future.result()
  File "C:\usr\PycharmProjects\api_framework\api\abc\abc.py", line 96, in do_stuff
    result = await asyncio.gather(*tasks)
  File "C:\usr\PycharmProjects\api_framework\api\abc\abc.py", line 140, in async_post
    async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
  File "C:\Python36x64\lib\site-packages\aiohttp\client.py", line 843, in __aenter__
    self._resp = await self._coro
  File "C:\Python36x64\lib\site-packages\aiohttp\client.py", line 387, in _request
    await resp.start(conn)
  File "C:\Python36x64\lib\site-packages\aiohttp\client_reqrep.py", line 748, in start
    message, payload = await self._protocol.read()
  File "C:\Python36x64\lib\site-packages\aiohttp\streams.py", line 533, in read
    await self._waiter
aiohttp.client_exceptions.ServerDisconnectedError: None
Run Code Online (Sandbox Code Playgroud)

这是生成异步请求的一些代码:

    def some_other_method(self):
        self.queue_manager(self.do_stuff(all_the_tasks))

    def queue_manager(self, method):
        print('starting event queue')
        loop = asyncio.get_event_loop()
        future = asyncio.ensure_future(method)
        loop.run_until_complete(future)
        loop.close()

    async def async_post(self, resource, session, data):
        async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
            resp = await response.read()
        return resp

    async def do_stuff(self, data):
        print('queueing tasks')

        tasks = []
        async with aiohttp.ClientSession() as session:
            for row in data:
                task = asyncio.ensure_future(self.async_post('my_api_endpoint', session, row))
                tasks.append(task)
            result = await asyncio.gather(*tasks)
            self.load_results(result)
Run Code Online (Sandbox Code Playgroud)

任务完成后,self.load_results() 方法只会解析 json 并更新数据库。

Ant*_*ert 30

这很可能是由 HTTP 服务器的配置引起的。ServerDisconnectedError 至少有两个可能的原因:

  1. 服务器可以限制可以从单个 IP 地址建立的并行 TCP 连接的数量。默认情况下,aiohttp已经将并行连接数限制为100。您可以尝试减少限制,看看是否能解决问题。为此,您可以创建TCPConnector具有不同限制值的自定义并将其传递给ClientSession
        connector = aiohttp.TCPConnector(limit=50)
        async with aiohttp.ClientSession(connector=connector) as session:
            # Use your session as usual here
Run Code Online (Sandbox Code Playgroud)
  1. 服务器可以限制 TCP 连接的持续时间。默认情况下,aiohttp使用 HTTP keep-alive,以便同一个 TCP 连接可以用于多个请求。这提高了性能,因为不必为每个请求建立新的 TCP 连接。但是,某些服务器会限制 TCP 连接的持续时间,如果您对多个请求使用同一个 TCP 连接,服务器可能会在您完成之前将其关闭。您可以禁用 HTTP keep-alive 作为解决方法。为此,您可以创建一个TCPConnector参数force_close设置为 的自定义True,并将其传递给ClientSession
        connector = aiohttp.TCPConnector(force_close=True)
        async with aiohttp.ClientSession(connector=connector) as session:
            # Use your session as usual here
Run Code Online (Sandbox Code Playgroud)

我遇到了同样的问题,禁用 HTTP keep-alive 是我的解决方案。希望这可以帮助。

  • 这个答案也对我有帮助,但是你能解释一下信号量和TCPConnector限制之间的关系吗?如果我有 Semaphore(20) 和 TCPConnector(limit=10) vs 如果我有 Semaphore(10) 和 TCPConnector(limit=20)?我对他们的相互作用感到困惑。 (3认同)

gra*_*uls 5

这很可能是服务器的 API 对异步完成的多个请求不满意。您可以使用 asyncio 的semaphores限制并发调用的数量。

在你的情况下,我会在上下文管理器中使用它:

async def do_stuff(self, data):
    print('queueing tasks')

    tasks = []
    semaphore = asyncio.Semaphore(200)

    async with semaphore:
        async with aiohttp.ClientSession() as session:
            for row in data:
                task = asyncio.ensure_future(self.async_post('my_api_endpoint', session, row))
                tasks.append(task)
            result = await asyncio.gather(*tasks)
            self.load_results(result)
Run Code Online (Sandbox Code Playgroud)

  • @Quanta `aiohttp.Semaphore` 并未被弃用。“loop”参数已被弃用,因为该参数在“aiohttp”中被全面删除,请参阅https://docs.python.org/3/library/asyncio-sync.html#asyncio.Semaphore (6认同)
  • `aiohttp.Semaphore` 现已贬值 (4认同)