SQLAlchemy:方法“_connection_for_bind()”已在进行中

Cyr*_* N. 2 sqlalchemy python-asyncio

我最近更新了 SQLAlchemy(带有 [asyncio] 包),1.4.46并在提交时开始出现以下异常:

\n
\n

sqlalchemy.exc.IllegalStateChangeError:此处无法调用方法“commit()”;方法“_connection_for_bind()”已在进行中,这将导致状态意外更改为 <SessionTransactionState.CLOSED: 5>

\n
\n

在更新到新版本之前,它运行良好。

\n
# -*- coding:utf-8 -*-\n\nfrom sqlalchemy import exc, event, text\nfrom sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession\nfrom sqlalchemy.orm import sessionmaker, Session\nfrom sqlalchemy.ext.asyncio import AsyncEngine\nfrom sqlalchemy.pool import NullPool, Pool\nfrom contextvars import ContextVar\nfrom sanic import Sanic\nimport asyncio\n\n\nclass EngineNotInitialisedError(Exception):\n    pass\n\n\nclass DBSessionContext:\n    def __init__(self, session: Session, commit_on_exit: bool = True) -> None:\n        self.session = session\n        self._query = None\n        self.commit_on_exit = commit_on_exit\n        self.token = None\n\n    async def close(self, exc_type=None, exc_value=None, traceback=None):\n        if self._query:\n            if exc_value and getattr(exc_value, \'status_code\', 500) > 300:\n                await self._query.rollback()\n                self._post_processing.clear()\n            else:\n                await self._query.commit()\n                await self.run_post_processing()\n\n            await self._query.close()\n\n        if self._post_processing:\n            await self.run_post_processing()\n\n    def set_token(self, token):\n        self.token = token\n\n    @property\n    def query(self) -> Session:\n        if not self._query:\n            self._query = self.session()\n\n        return self._query\n\n\nclass AsyncSession(SQLAlchemyAsyncSession):\n    async def execute(self, statement, **parameters):\n        try:\n            if isinstance(statement, str):\n                # We wrap around the `text()` method automatically\n                statement = text(statement)\n            return await super().execute(statement, parameters)\n        except exc.OperationalError as e:\n            if e.orig.args[0] == 1205:\n                # Lock wait timeout exceeded\n                await self.rollback()\n                return await super().execute(statement, parameters)\n\n            raise e\n\n\nclass DBSession:\n    def __init__(self):\n        self.engine = None\n        self.session = None\n        self._session = None\n        self.context = ContextVar("context", default=None)\n\n    def init_app(self, app: Sanic, url: str, commit_on_exit: bool = True) -> None:\n        self.commit_on_exit = commit_on_exit\n\n        engine_args = {\n            \'echo\': app.config.get(\'DATABASE_ECHO\', cast=bool, default=False),\n            \'echo_pool\': app.config.get(\'DATABASE_ECHO_POOL\', cast=bool, default=False),\n            \'poolclass\': NullPool,  # will be used to create a connection pool instance using the connection parameters given in the URL\n            # if pool_class is not NullPool:\n\n            # the number of connections to allow in connection pool \xe2\x80\x9coverflow\xe2\x80\x9d\n            # \'max_overflow\': app.config.get(\'DATABASE_MAX_OVERFLOW\', cast=int, default=10),\n            # if True will enable the connection pool \xe2\x80\x9cpre-ping\xe2\x80\x9d feature that tests connections for liveness upon each checkout\n            # \'pool_pre_ping\': app.config.get(\'DATABASE_POOL_PRE_PING\', cast=bool, default=True),\n            # the number of connections to keep open inside the connection pool\n            # \'pool_size\': app.config.get(\'DATABASE_POOL_SIZE\', cast=int, default=5),\n            # this setting causes the pool to recycle connections after the given number of seconds has passed\n            # \'pool_recycle\': app.config.get(\'DATABASE_POOL_RECYCLE\', cast=int, default=-1),\n            # number of seconds to wait before giving up on getting a connection from the pool\n            # \'pool_timeout\': app.config.get(\'DATABASE_POOL_TIMEOUT\', cast=int, default=3600),\n        }\n\n        self.engine = create_async_engine(\n            url,\n            **engine_args\n        )\n\n        self.session = sessionmaker(\n            bind=self.engine,\n            expire_on_commit=False,\n            class_=AsyncSession,\n            autoflush=False\n        )\n\n    async def __aenter__(self):\n        if not isinstance(self.engine, AsyncEngine):\n            raise EngineNotInitialisedError\n\n        session_ctx = DBSessionContext(self.session, self.commit_on_exit)\n        session_ctx.set_token(self.context.set(session_ctx))\n\n        return session_ctx\n\n    async def __aexit__(self, exc_type, exc_value, traceback):\n        session_ctx = self.context.get()\n        await asyncio.shield(session_ctx.close(exc_type, exc_value, traceback))\n\n        self.context.reset(session_ctx.token)\n\n    @property\n    def query(self) -> Session:\n        return self.context.get().query\n\n\n@event.listens_for(Pool, "checkout")\ndef check_connection(dbapi_con, con_record, con_proxy):\n    \'\'\'Listener for Pool checkout events that pings every connection before using.\n    Implements pessimistic disconnect handling strategy. See also:\n    http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic\'\'\'\n\n    cursor = dbapi_con.cursor()\n    try:\n        cursor.execute("SELECT 1")\n    except exc.OperationalError as ex:\n        if ex.args[0] in (2006,   # MySQL server has gone away\n                          2013,   # Lost connection to MySQL server during query\n                          2055):  # Lost connection to MySQL server at \'%s\', system error: %d\n            raise exc.DisconnectionError()  # caught by pool, which will retry with a new connection\n        else:\n            raise\n\n    cursor.close()\n\n\ndb = DBSession()\n
Run Code Online (Sandbox Code Playgroud)\n

该代码的调用方式如下:

\n
async with db:\n    await db.query.execute(\'INSERT INTO ...\')\n
Run Code Online (Sandbox Code Playgroud)\n

是什么导致了我遇到的 InvalidStateChangeError?我怎样才能避免这个问题?

\n

Cyr*_* N. 5

SQLAlchemy 的 Github 存储库上有一个讨论,给出了问题发生的原因: https: //github.com/sqlalchemy/sqlalchemy/discussions/9312

建议是代码调用类似的东西

asyncio.gather(func(session), func2(session)这两个函数共享同一个会话,这会导致sqlalchemy.exc.IllegalStateChangeError

删除asyncio.gather呼叫可以解决该问题。(或者使用两个会话,每个函数一个)。

  • 感谢您发布 GH 问题。我通过将 func1 和 func2 的执行移动到一个任务中来解决这个问题,该任务为每个收集的任务设置连接和会话 (4认同)