Cyr*_* N. 2 sqlalchemy python-asyncio
我最近更新了 SQLAlchemy(带有 [asyncio] 包),1.4.46
并在提交时开始出现以下异常:
\n\nsqlalchemy.exc.IllegalStateChangeError:此处无法调用方法“commit()”;方法“_connection_for_bind()”已在进行中,这将导致状态意外更改为 <SessionTransactionState.CLOSED: 5>
\n
在更新到新版本之前,它运行良好。
\n# -*- coding:utf-8 -*-\n\nfrom sqlalchemy import exc, event, text\nfrom sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession\nfrom sqlalchemy.orm import sessionmaker, Session\nfrom sqlalchemy.ext.asyncio import AsyncEngine\nfrom sqlalchemy.pool import NullPool, Pool\nfrom contextvars import ContextVar\nfrom sanic import Sanic\nimport asyncio\n\n\nclass EngineNotInitialisedError(Exception):\n pass\n\n\nclass DBSessionContext:\n def __init__(self, session: Session, commit_on_exit: bool = True) -> None:\n self.session = session\n self._query = None\n self.commit_on_exit = commit_on_exit\n self.token = None\n\n async def close(self, exc_type=None, exc_value=None, traceback=None):\n if self._query:\n if exc_value and getattr(exc_value, \'status_code\', 500) > 300:\n await self._query.rollback()\n self._post_processing.clear()\n else:\n await self._query.commit()\n await self.run_post_processing()\n\n await self._query.close()\n\n if self._post_processing:\n await self.run_post_processing()\n\n def set_token(self, token):\n self.token = token\n\n @property\n def query(self) -> Session:\n if not self._query:\n self._query = self.session()\n\n return self._query\n\n\nclass AsyncSession(SQLAlchemyAsyncSession):\n async def execute(self, statement, **parameters):\n try:\n if isinstance(statement, str):\n # We wrap around the `text()` method automatically\n statement = text(statement)\n return await super().execute(statement, parameters)\n except exc.OperationalError as e:\n if e.orig.args[0] == 1205:\n # Lock wait timeout exceeded\n await self.rollback()\n return await super().execute(statement, parameters)\n\n raise e\n\n\nclass DBSession:\n def __init__(self):\n self.engine = None\n self.session = None\n self._session = None\n self.context = ContextVar("context", default=None)\n\n def init_app(self, app: Sanic, url: str, commit_on_exit: bool = True) -> None:\n self.commit_on_exit = commit_on_exit\n\n engine_args = {\n \'echo\': app.config.get(\'DATABASE_ECHO\', cast=bool, default=False),\n \'echo_pool\': app.config.get(\'DATABASE_ECHO_POOL\', cast=bool, default=False),\n \'poolclass\': NullPool, # will be used to create a connection pool instance using the connection parameters given in the URL\n # if pool_class is not NullPool:\n\n # the number of connections to allow in connection pool \xe2\x80\x9coverflow\xe2\x80\x9d\n # \'max_overflow\': app.config.get(\'DATABASE_MAX_OVERFLOW\', cast=int, default=10),\n # if True will enable the connection pool \xe2\x80\x9cpre-ping\xe2\x80\x9d feature that tests connections for liveness upon each checkout\n # \'pool_pre_ping\': app.config.get(\'DATABASE_POOL_PRE_PING\', cast=bool, default=True),\n # the number of connections to keep open inside the connection pool\n # \'pool_size\': app.config.get(\'DATABASE_POOL_SIZE\', cast=int, default=5),\n # this setting causes the pool to recycle connections after the given number of seconds has passed\n # \'pool_recycle\': app.config.get(\'DATABASE_POOL_RECYCLE\', cast=int, default=-1),\n # number of seconds to wait before giving up on getting a connection from the pool\n # \'pool_timeout\': app.config.get(\'DATABASE_POOL_TIMEOUT\', cast=int, default=3600),\n }\n\n self.engine = create_async_engine(\n url,\n **engine_args\n )\n\n self.session = sessionmaker(\n bind=self.engine,\n expire_on_commit=False,\n class_=AsyncSession,\n autoflush=False\n )\n\n async def __aenter__(self):\n if not isinstance(self.engine, AsyncEngine):\n raise EngineNotInitialisedError\n\n session_ctx = DBSessionContext(self.session, self.commit_on_exit)\n session_ctx.set_token(self.context.set(session_ctx))\n\n return session_ctx\n\n async def __aexit__(self, exc_type, exc_value, traceback):\n session_ctx = self.context.get()\n await asyncio.shield(session_ctx.close(exc_type, exc_value, traceback))\n\n self.context.reset(session_ctx.token)\n\n @property\n def query(self) -> Session:\n return self.context.get().query\n\n\n@event.listens_for(Pool, "checkout")\ndef check_connection(dbapi_con, con_record, con_proxy):\n \'\'\'Listener for Pool checkout events that pings every connection before using.\n Implements pessimistic disconnect handling strategy. See also:\n http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic\'\'\'\n\n cursor = dbapi_con.cursor()\n try:\n cursor.execute("SELECT 1")\n except exc.OperationalError as ex:\n if ex.args[0] in (2006, # MySQL server has gone away\n 2013, # Lost connection to MySQL server during query\n 2055): # Lost connection to MySQL server at \'%s\', system error: %d\n raise exc.DisconnectionError() # caught by pool, which will retry with a new connection\n else:\n raise\n\n cursor.close()\n\n\ndb = DBSession()\n
Run Code Online (Sandbox Code Playgroud)\n该代码的调用方式如下:
\nasync with db:\n await db.query.execute(\'INSERT INTO ...\')\n
Run Code Online (Sandbox Code Playgroud)\n是什么导致了我遇到的 InvalidStateChangeError?我怎样才能避免这个问题?
\nSQLAlchemy 的 Github 存储库上有一个讨论,给出了问题发生的原因: https: //github.com/sqlalchemy/sqlalchemy/discussions/9312
建议是代码调用类似的东西
asyncio.gather(func(session), func2(session)
这两个函数共享同一个会话,这会导致sqlalchemy.exc.IllegalStateChangeError
删除asyncio.gather
呼叫可以解决该问题。(或者使用两个会话,每个函数一个)。
归档时间: |
|
查看次数: |
2716 次 |
最近记录: |