Cyr*_* N. 4 python sqlalchemy sanic
我将尽力在这个问题上做到尽可能完整。
\n我正在使用 Sanic,一个 ASGI Python 框架,并在此基础上构建了一个数据库管理器。
\n该数据库管理器使用ContextVar来访问db代码中所有位置的当前实例。
这是与数据库相关的代码:
\n数据库.py
\n# -*- coding:utf-8 -*-\n\nfrom sqlalchemy import exc, event\nfrom sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession\nfrom sqlalchemy.orm import sessionmaker, Session\nfrom sqlalchemy.pool import Pool, QueuePool, NullPool\nfrom sqlalchemy.exc import OperationalError\nfrom contextvars import ContextVar\nfrom sentry_sdk import push_scope, capture_exception\nfrom sanic import Sanic\n\n\nclass EngineNotInitialisedError(Exception):\n pass\n\n\nclass DBSessionContext:\n def __init__(self, read_session: Session, write_session: Session, commit_on_exit: bool = True) -> None:\n self.read_session = read_session\n self.write_session = write_session\n self.commit_on_exit = commit_on_exit\n\n self.token = None\n self._read = None\n self._write = None\n\n def _disable_flush(self, *args, **kwargs):\n raise NotImplementedError(\'Unable to flush a read-only session.\')\n\n async def close(self, exc_type=None, exc_value=None, traceback=None):\n if self._write:\n try:\n if exc_value and getattr(exc_value, \'status_code\', 500) > 300:\n await self._write.rollback()\n else:\n await self._write.commit()\n except Exception as e:\n pass\n\n try:\n await self._write.close()\n except OperationalError as e:\n if e.orig.args[0] != 2013: # Lost connection to MySQL server during query\n raise e\n\n if self._read:\n try:\n await self._read.close()\n except OperationalError as e:\n if e.orig.args[0] != 2013: # Lost connection to MySQL server during query\n raise e\n\n def set_token(self, token):\n self.token = token\n\n @property\n def read(self) -> Session:\n if not self._read:\n self._read = self.read_session()\n self._read.flush = self._disable_flush\n\n return self._read\n\n @property\n def write(self) -> Session:\n if not self._write:\n self._write = self.write_session()\n\n return self._write\n\n\nclass AsyncSession(SQLAlchemyAsyncSession):\n async def execute(self, statement, **parameters):\n return await super().execute(statement, parameters)\n\n async def first(self, statement, **parameters):\n executed = await self.execute(statement, **parameters)\n return executed.first()\n\n async def all(self, statement, **parameters):\n executed = await self.execute(statement, **parameters)\n return executed.all()\n\n\nclass DBSession:\n def __init__(self):\n self.app = None\n self.read_engine = None\n self.read_session = None\n self.write_engine = None\n self.write_session = None\n self._session = None\n self.context = ContextVar("context", default=None)\n self.commit_on_exit = True\n\n def init_app(self, app: Surge) -> None:\n self.app = app\n self.commit_on_exit = self.app.config.get(\'DATABASE_COMMIT_ON_EXIT\', cast=bool, default=True)\n\n self.read_engine = create_async_engine(\n self.app.config.get(\'DATABASE_READ_URL\'),\n connect_args={\n \'connect_timeout\': self.app.config.get(\'DATABASE_CONNECT_TIMEOUT\', cast=int, default=3)\n },\n **{\n \'echo\': self.app.config.get(\'DATABASE_ECHO\', cast=bool, default=False),\n \'echo_pool\': self.app.config.get(\'DATABASE_ECHO_POOL\', cast=bool, default=False),\n \'poolclass\': QueuePool, # will be used to create a connection pool instance using the connection parameters given in the URL\n # if pool_class is not NullPool:\n\n # if True will enable the connection pool \xe2\x80\x9cpre-ping\xe2\x80\x9d feature that tests connections for liveness upon each checkout\n \'pool_pre_ping\': self.app.config.get(\'DATABASE_POOL_PRE_PING\', cast=bool, default=True),\n # the number of connections to allow in connection pool \xe2\x80\x9coverflow\xe2\x80\x9d\n \'max_overflow\': self.app.config.get(\'DATABASE_MAX_OVERFLOW\', cast=int, default=10),\n # the number of connections to keep open inside the connection pool\n \'pool_size\': self.app.config.get(\'DATABASE_POOL_SIZE\', cast=int, default=100),\n # this setting causes the pool to recycle connections after the given number of seconds has passed\n \'pool_recycle\': self.app.config.get(\'DATABASE_POOL_RECYCLE\', cast=int, default=3600),\n # number of seconds to wait before giving up on getting a connection from the pool\n \'pool_timeout\': self.app.config.get(\'DATABASE_POOL_TIMEOUT\', cast=int, default=5),\n }\n )\n\n # @see https://writeonly.wordpress.com/2009/07/16/simple-read-only-sqlalchemy-sessions/\n self.read_session = sessionmaker(\n bind=self.read_engine,\n expire_on_commit=False,\n class_=AsyncSession,\n autoflush=False,\n autocommit=False\n )\n\n self.write_engine = create_async_engine(\n self.app.config.get(\'DATABASE_WRITE_URL\'),\n connect_args={\n \'connect_timeout\': self.app.config.get(\'DATABASE_CONNECT_TIMEOUT\', cast=int, default=3)\n },\n **{\n \'echo\': self.app.config.get(\'DATABASE_ECHO\', cast=bool, default=False),\n \'echo_pool\': self.app.config.get(\'DATABASE_ECHO_POOL\', cast=bool, default=False),\n \'poolclass\': NullPool, # will be used to create a connection pool instance using the connection parameters given in the URL\n }\n )\n\n self.write_session = sessionmaker(\n bind=self.write_engine,\n expire_on_commit=False,\n class_=AsyncSession,\n autoflush=True\n )\n\n async def __aenter__(self):\n session_ctx = DBSessionContext(self.read_session, self.write_session, self.commit_on_exit)\n session_ctx.set_token(self.context.set(session_ctx))\n\n return session_ctx\n\n async def __aexit__(self, exc_type, exc_value, traceback):\n session_ctx = self.context.get()\n try:\n await session_ctx.close(exc_type, exc_value, traceback)\n except Exception:\n pass\n\n self.context.reset(session_ctx.token)\n\n @property\n def read(self) -> Session:\n return self.context.get().read\n\n @property\n def write(self) -> Session:\n return self.context.get().write\n\n\n@event.listens_for(Pool, "checkout")\ndef check_connection(dbapi_con, con_record, con_proxy):\n \'\'\'Listener for Pool checkout events that pings every connection before using.\n Implements pessimistic disconnect handling strategy. See also:\n http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic\'\'\'\n\n cursor = dbapi_con.cursor()\n try:\n cursor.execute("SELECT 1")\n except exc.OperationalError as ex:\n if ex.args[0] in (2006, # MySQL server has gone away\n 2013, # Lost connection to MySQL server during query\n 2055): # Lost connection to MySQL server at \'%s\', system error: %d\n raise exc.DisconnectionError() # caught by pool, which will retry with a new connection\n else:\n raise\n\n cursor.close()\n\n\ndb = DBSession()\nRun Code Online (Sandbox Code Playgroud)\n这个配置允许我运行类似的东西:
\nfrom models import User\nfrom database import db\n\n@app.get(\'/user\')\nasync def get_user(request):\n async with db:\n users = User.find_all() # Special function in the Model that returns all users\n return json({\'items\': [{\'id\': x.id for x in users}])\nRun Code Online (Sandbox Code Playgroud)\n当代码退出时,DBSession 类(以及后续的)中的和__aenter__大部分处理所有异常,包括发生的任何异常。__aexit__DBSessionContextasync with
我遇到的问题是,Sentry 有时会报告以下错误:
\n\n\n垃圾收集器正在尝试清理连接 <AdaptedConnection <asyncmy.connection.Connection object at 0x7f290c50dd30>>。缺少“终止”功能的 asyncio dbapi 不支持此功能,因为在此阶段无法执行 IO 来重置连接。当不再使用连接时,请关闭所有连接,调用
\nclose()或使用上下文管理器来管理它们的生命周期。
我不明白为什么会发生这种情况。更奇怪的是,我经常在根本不使用数据库的函数调用上遇到此错误(该错误async with db仍然存在,但内部根本不使用数据库)。
该函数的内容是网络调用:
\nimport requests\n\n@app.get(\'/notify\')\nasync def get_user(request):\n async with db:\n requests.post(\'https://service.com/notify\', data={\'some\': \'data\'})\n\n return text(\'ok\')\nRun Code Online (Sandbox Code Playgroud)\n以下是我的假设,但我希望对这个问题有更清晰的看法:
\n__aexit__不会close真正关闭连接,因此连接保持打开状态,从而导致稍后出现“垃圾收集器正在尝试清理连接”问题。check_connection并保持打开状态,导致“垃圾收集器”问题知道为什么我遇到“垃圾收集器”问题吗?
\n我在用着 :
\n| 归档时间: |
|
| 查看次数: |
4035 次 |
| 最近记录: |