我有一个网络服务器,大致如下:
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# ==========================================
cards = []
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
Q = json.loads(msg.data)
if "q" in Q:
async with aiohttp.ClientSession(cookies=request.cookies) as session:
async with session.get(f"{SEARCH_ROOT}/s", params=Q) as resp:
doc_order = await resp.json()
cards = [
*doc_order["results"],
{"done": True},
]
if len(cards) > 0:
card = cards.pop(0)
await ws.send_json(card)
else:
ws.close()
return ws
Run Code Online (Sandbox Code Playgroud)
问题是,这工作得很好,但在大约 13-15 个新的 websocket 连接之后,我开始看到声称could not start a new thread.
文档提到客户端会话应该在每个服务器实例中存在一次,但我无法弄清楚如何做到这一点。我有一些想法:
我得到的确切回溯是这样的:
Traceback (most recent call last): [58/1849]
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_protocol.py", line 418, in start
resp = await task
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_app.py", line 458, in _handle
resp = await handler(request)
File "/home/user/myproject/myproject/api/websocket.py", line 22, in websocket_handler
async with aiohttp.request('GET', f"{SEARCH_ROOT}/s", params=Q, cookies=request.cookies) as resp:
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/client.py", line 1043, in __aenter__
self._resp = await self._coro
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/client.py", line 476, in _request
timeout=real_timeout
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/connector.py", line 522, in connect
proto = await self._create_connection(req, traces, timeout)
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/connector.py", line 854, in _create_connec
tion
req, traces, timeout)
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/connector.py", line 955, in _create_direct
_connection
traces=traces), loop=self._loop)
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/connector.py", line 825, in _resolve_host
self._resolver.resolve(host, port, family=self._family)
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/resolver.py", line 30, in resolve
host, port, type=socket.SOCK_STREAM, family=family)
File "/usr/lib/python3.6/asyncio/base_events.py", line 681, in getaddrinfo
host, port, family, type, proto, flags)
File "/usr/lib/python3.6/asyncio/base_events.py", line 644, in run_in_executor
return futures.wrap_future(executor.submit(func, *args), loop=self)
File "/usr/lib/python3.6/concurrent/futures/thread.py", line 123, in submit
self._adjust_thread_count()
File "/usr/lib/python3.6/concurrent/futures/thread.py", line 142, in _adjust_thread_count
t.start()
File "/usr/lib/python3.6/threading.py", line 846, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
Unhandled exception
Traceback (most recent call last):
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_protocol.py", line 447, in start
await resp.prepare(request)
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_response.py", line 353, in prepare
return await self._start(request)
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_response.py", line 667, in _start
return await super()._start(request)
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_response.py", line 410, in _start
await writer.write_headers(status_line, headers)
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/http_writer.py", line 112, in write_header
s
self._write(buf)
File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/http_writer.py", line 67, in _write
raise ConnectionResetError('Cannot write to closing transport')
ConnectionResetError: Cannot write to closing transport
Run Code Online (Sandbox Code Playgroud)
这是如何为aiohttp.ClientSession()每个aiohttp服务器创建单个实例(又名app)的最小工作示例。
的实例在函数 [1]aiohttp.ClientSesssion()中创建。app_factory该实例存储在应用程序实例 [2] 中,可以通过以下方式在请求处理程序中访问:request.app['client_session']。aiohttp.ClientSession()使用 Cleanup Context [3] 正确关闭
import argparse
import logging
from typing import Final, NoReturn
import aiohttp
from aiohttp import web
from aiohttp.web_request import Request
logging.basicConfig(level=logging.DEBUG)
routes: Final = web.RouteTableDef()
logger: Final = logging.getLogger(__name__)
@routes.get('/')
async def hello(request: Request) -> web.Response:
url_to_fetch = "https://httpbin.org/json"
async with request.app['client_session'].get(url_to_fetch) as response:
return web.json_response(await response.json())
async def client_session_ctx(app: web.Application) -> NoReturn:
"""
Cleanup context async generator to create and properly close aiohttp ClientSession
Ref.:
> https://docs.aiohttp.org/en/stable/web_advanced.html#cleanup-context
> https://docs.aiohttp.org/en/stable/web_advanced.html#aiohttp-web-signals
> https://docs.aiohttp.org/en/stable/web_advanced.html#data-sharing-aka-no-singletons-please
"""
logger.debug('Creating ClientSession')
app['client_session'] = aiohttp.ClientSession()
yield
logger.debug('Closing ClientSession')
await app['client_session'].close()
async def app_factory() -> web.Application:
"""
See: https://docs.aiohttp.org/en/stable/web_advanced.html
"""
logger.debug('[APP Factory] Creating Application (entering APP Factory)')
app = web.Application()
logger.debug('[APP Factory] Adding Routes')
app.add_routes(routes)
logger.debug('[APP Factory] Registering Cleanup contexts')
app.cleanup_ctx.append(client_session_ctx)
logger.debug('[APP Factory] APP is now prepared and can be returned by APP Factory')
return app
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int, help='Port to run HTTP server', default=8000)
args = parser.parse_args()
web.run_app(app_factory(), port=args.port)
Run Code Online (Sandbox Code Playgroud)
[1] https://docs.aiohttp.org/en/stable/web_advanced.html#passing-a-coroutine-into-run-app-and-gunicorn
[2] https://docs.aiohttp.org/en/stable/web_advanced.html#data-sharing-aka-no-singletons-please
[3] https://docs.aiohttp.org/en/stable/web_advanced.html#cleanup-context
| 归档时间: |
|
| 查看次数: |
2430 次 |
| 最近记录: |