垃圾收集器正在尝试清理连接 asyncmy.connection.Connection

Cyr*_* N. 4 python sqlalchemy sanic

我将尽力在这个问题上做到尽可能完整。

\n

我正在使用 Sanic,一个 ASGI Python 框架,并在此基础上构建了一个数据库管理器。

\n

该数据库管理器使用ContextVar来访问db代码中所有位置的当前实例。

\n

这是与数据库相关的代码:

\n

数据库.py

\n
# -*- coding:utf-8 -*-\n\nfrom sqlalchemy import exc, event\nfrom sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession\nfrom sqlalchemy.orm import sessionmaker, Session\nfrom sqlalchemy.pool import Pool, QueuePool, NullPool\nfrom sqlalchemy.exc import OperationalError\nfrom contextvars import ContextVar\nfrom sentry_sdk import push_scope, capture_exception\nfrom sanic import Sanic\n\n\nclass EngineNotInitialisedError(Exception):\n    pass\n\n\nclass DBSessionContext:\n    def __init__(self, read_session: Session, write_session: Session, commit_on_exit: bool = True) -> None:\n        self.read_session = read_session\n        self.write_session = write_session\n        self.commit_on_exit = commit_on_exit\n\n        self.token = None\n        self._read = None\n        self._write = None\n\n    def _disable_flush(self, *args, **kwargs):\n        raise NotImplementedError(\'Unable to flush a read-only session.\')\n\n    async def close(self, exc_type=None, exc_value=None, traceback=None):\n        if self._write:\n            try:\n                if exc_value and getattr(exc_value, \'status_code\', 500) > 300:\n                    await self._write.rollback()\n                else:\n                    await self._write.commit()\n            except Exception as e:\n                pass\n\n            try:\n                await self._write.close()\n            except OperationalError as e:\n                if e.orig.args[0] != 2013:  # Lost connection to MySQL server during query\n                    raise e\n\n        if self._read:\n            try:\n                await self._read.close()\n            except OperationalError as e:\n                if e.orig.args[0] != 2013:  # Lost connection to MySQL server during query\n                    raise e\n\n    def set_token(self, token):\n        self.token = token\n\n    @property\n    def read(self) -> Session:\n        if not self._read:\n            self._read = self.read_session()\n            self._read.flush = self._disable_flush\n\n        return self._read\n\n    @property\n    def write(self) -> Session:\n        if not self._write:\n            self._write = self.write_session()\n\n        return self._write\n\n\nclass AsyncSession(SQLAlchemyAsyncSession):\n    async def execute(self, statement, **parameters):\n        return await super().execute(statement, parameters)\n\n    async def first(self, statement, **parameters):\n        executed = await self.execute(statement, **parameters)\n        return executed.first()\n\n    async def all(self, statement, **parameters):\n        executed = await self.execute(statement, **parameters)\n        return executed.all()\n\n\nclass DBSession:\n    def __init__(self):\n        self.app = None\n        self.read_engine = None\n        self.read_session = None\n        self.write_engine = None\n        self.write_session = None\n        self._session = None\n        self.context = ContextVar("context", default=None)\n        self.commit_on_exit = True\n\n    def init_app(self, app: Surge) -> None:\n        self.app = app\n        self.commit_on_exit = self.app.config.get(\'DATABASE_COMMIT_ON_EXIT\', cast=bool, default=True)\n\n        self.read_engine = create_async_engine(\n            self.app.config.get(\'DATABASE_READ_URL\'),\n            connect_args={\n                \'connect_timeout\': self.app.config.get(\'DATABASE_CONNECT_TIMEOUT\', cast=int, default=3)\n            },\n            **{\n                \'echo\': self.app.config.get(\'DATABASE_ECHO\', cast=bool, default=False),\n                \'echo_pool\': self.app.config.get(\'DATABASE_ECHO_POOL\', cast=bool, default=False),\n                \'poolclass\': QueuePool,  # will be used to create a connection pool instance using the connection parameters given in the URL\n                # if pool_class is not NullPool:\n\n                # if True will enable the connection pool \xe2\x80\x9cpre-ping\xe2\x80\x9d feature that tests connections for liveness upon each checkout\n                \'pool_pre_ping\': self.app.config.get(\'DATABASE_POOL_PRE_PING\', cast=bool, default=True),\n                # the number of connections to allow in connection pool \xe2\x80\x9coverflow\xe2\x80\x9d\n                \'max_overflow\': self.app.config.get(\'DATABASE_MAX_OVERFLOW\', cast=int, default=10),\n                # the number of connections to keep open inside the connection pool\n                \'pool_size\': self.app.config.get(\'DATABASE_POOL_SIZE\', cast=int, default=100),\n                # this setting causes the pool to recycle connections after the given number of seconds has passed\n                \'pool_recycle\': self.app.config.get(\'DATABASE_POOL_RECYCLE\', cast=int, default=3600),\n                # number of seconds to wait before giving up on getting a connection from the pool\n                \'pool_timeout\': self.app.config.get(\'DATABASE_POOL_TIMEOUT\', cast=int, default=5),\n            }\n        )\n\n        # @see https://writeonly.wordpress.com/2009/07/16/simple-read-only-sqlalchemy-sessions/\n        self.read_session = sessionmaker(\n            bind=self.read_engine,\n            expire_on_commit=False,\n            class_=AsyncSession,\n            autoflush=False,\n            autocommit=False\n        )\n\n        self.write_engine = create_async_engine(\n            self.app.config.get(\'DATABASE_WRITE_URL\'),\n            connect_args={\n                \'connect_timeout\': self.app.config.get(\'DATABASE_CONNECT_TIMEOUT\', cast=int, default=3)\n            },\n            **{\n                \'echo\': self.app.config.get(\'DATABASE_ECHO\', cast=bool, default=False),\n                \'echo_pool\': self.app.config.get(\'DATABASE_ECHO_POOL\', cast=bool, default=False),\n                \'poolclass\': NullPool,  # will be used to create a connection pool instance using the connection parameters given in the URL\n            }\n        )\n\n        self.write_session = sessionmaker(\n            bind=self.write_engine,\n            expire_on_commit=False,\n            class_=AsyncSession,\n            autoflush=True\n        )\n\n    async def __aenter__(self):\n        session_ctx = DBSessionContext(self.read_session, self.write_session, self.commit_on_exit)\n        session_ctx.set_token(self.context.set(session_ctx))\n\n        return session_ctx\n\n    async def __aexit__(self, exc_type, exc_value, traceback):\n        session_ctx = self.context.get()\n        try:\n            await session_ctx.close(exc_type, exc_value, traceback)\n        except Exception:\n            pass\n\n        self.context.reset(session_ctx.token)\n\n    @property\n    def read(self) -> Session:\n        return self.context.get().read\n\n    @property\n    def write(self) -> Session:\n        return self.context.get().write\n\n\n@event.listens_for(Pool, "checkout")\ndef check_connection(dbapi_con, con_record, con_proxy):\n    \'\'\'Listener for Pool checkout events that pings every connection before using.\n    Implements pessimistic disconnect handling strategy. See also:\n    http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic\'\'\'\n\n    cursor = dbapi_con.cursor()\n    try:\n        cursor.execute("SELECT 1")\n    except exc.OperationalError as ex:\n        if ex.args[0] in (2006,   # MySQL server has gone away\n                          2013,   # Lost connection to MySQL server during query\n                          2055):  # Lost connection to MySQL server at \'%s\', system error: %d\n            raise exc.DisconnectionError()  # caught by pool, which will retry with a new connection\n        else:\n            raise\n\n    cursor.close()\n\n\ndb = DBSession()\n
Run Code Online (Sandbox Code Playgroud)\n

这个配置允许我运行类似的东西:

\n
from models import User\nfrom database import db\n\n@app.get(\'/user\')\nasync def get_user(request):\n    async with db:\n        users = User.find_all()  # Special function in the Model that returns all users\n        return json({\'items\': [{\'id\': x.id for x in users}])\n
Run Code Online (Sandbox Code Playgroud)\n

当代码退出时,DBSession 类(以及后续的)中的和__aenter__大部分处理所有异常,包括发生的任何异常。__aexit__DBSessionContextasync with

\n

我遇到的问题是,Sentry 有时会报告以下错误:

\n
\n

垃圾收集器正在尝试清理连接 <AdaptedConnection <asyncmy.connection.Connection object at 0x7f290c50dd30>>。缺少“终止”功能的 asyncio dbapi 不支持此功能,因为在此阶段无法执行 IO 来重置连接。当不再使用连接时,请关闭所有连接,调用close()或使用上下文管理器来管理它们的生命周期。

\n
\n

我不明白为什么会发生这种情况。更奇怪的是,我经常在根本不使用数据库的函数调用上遇到此错误(该错误async with db仍然存​​在,但内部根本不使用数据库)。

\n

该函数的内容是网络调用:

\n
import requests\n\n@app.get(\'/notify\')\nasync def get_user(request):\n    async with db:\n        requests.post(\'https://service.com/notify\', data={\'some\': \'data\'})\n\n    return text(\'ok\')\n
Run Code Online (Sandbox Code Playgroud)\n

以下是我的假设,但我希望对这个问题有更清晰的看法:

\n
    \n
  • 假设 1:由于读取使用的是 QueuePool,因此调用可能__aexit__不会close真正关闭连接,因此连接保持打开状态,从而导致稍后出现“垃圾收集器正在尝试清理连接”问题。
  • \n
  • 假设 2:连接已建立check_connection并保持打开状态,导致“垃圾收集器”问题
  • \n
\n

知道为什么我遇到“垃圾收集器”问题吗?

\n

我在用着 :

\n
    \n
  • 萨尼克==22.9.0
  • \n
  • sqlalchemy[asyncio]==1.4.41
  • \n
  • 异步==0.2.5
  • \n
\n

Lif*_*lex 8

该行可能会导致您出现问题await session_ctx.close(exc_type, exc_value, traceback)

尝试将其更改为这个await asyncio.shield(session_ctx.close(exc_type, exc_value, traceback))

这已于 7 月份添加到SQLAlchemy 代码库中。

在此输入图像描述

/asyncio/engine.py此更改在和中实施/asyncio/session.py。这是代码中的更改:

在此输入图像描述

附加参考:

SQLAlchemy 问题 8145

该更改已添加到版本1.4.40,发布日期为 2022 年 8 月 8 日