SqlAlchemy 1.4 抛出 InternalServerError(缓存查找失败,类型 3912040)

Hou*_*man 5 python postgresql sqlalchemy asyncpg

我正在使用 SqlAlchemy 1.4.18(异步),我相信我遇到了无法解释的竞争条件。底层数据库是Postgresasyncpg由SQLAlchemy的内部使用。

我在 SQL Alchemy Core 中有以下插入函数。

async def create_device(
    device_id: str,
    device_type: DeviceType,
    account_type: AccountType = AccountType.FREE,
    expires_at: Optional[datetime] = None,
    account_id: Optional[int] = None,
    is_banned: bool = False,
    last_login_at: Optional[datetime] = None,
) -> datetime:
    if expires_at is None:
        expires_at = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
            days=7
        )
    async with engine.begin() as conn:
        await conn.execute(
            DeviceTable.insert().values(
                id=device_id,
                type=device_type,
                expires_at=expires_at,
                account_type=account_type,
                account_id=account_id,
                is_banned=is_banned,
                last_login_at=last_login_at,
            ),
        )
        return expires_at
Run Code Online (Sandbox Code Playgroud)

单元测试自行成功运行。但是,当我在测试类中运行所有测试时,此测试每次都会失败。

@pytest.mark.asyncio
    @patch("service.email_service.EmailService.confirm_token")
    async def test_confirm_email_already_confirmed(self, mock_token, client):
        expiry_date = self.get_time_in_future()
        account_id = await crud_account.create_account(
            "h@h1.de", "pass1", is_confirmed=True
        )
        await crud_device.create_device(
            "u1", DeviceType.IPHONE, account_id=account_id, expires_at=expiry_date
        )
## It has already failed at this point.
        mock_token.return_value = "h@h1.de"
        result = await client.get("/email/confirm/t1")
        assert result.status_code == 200

Run Code Online (Sandbox Code Playgroud)

错误:

../app/database/crud_device.py:26: in create_device
    await conn.execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/engine.py:405: in execute
    result = await greenlet_spawn(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:125: in greenlet_spawn
    result = context.throw(*sys.exc_info())
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1582: in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:324: in _execute_on_connection
    return connection._execute_clauseelement(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1451: in _execute_clauseelement
    ret = self._execute_context(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1813: in _execute_context
    self._handle_dbapi_exception(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1994: in _handle_dbapi_exception
    util.raise_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py:207: in raise_
    raise exception
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1770: in _execute_context
    self.dialect.do_execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/default.py:717: in do_execute
    cursor.execute(statement, parameters)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:449: in execute
    self._adapt_connection.await_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:67: in await_only
    return current.driver.switch(awaitable)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:120: in greenlet_spawn
    value = await result
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:424: in _prepare_and_execute
    self._handle_exception(error)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:358: in _handle_exception
    self._adapt_connection._handle_exception(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_connection object at 0x115007340>
error = InternalServerError('cache lookup failed for type 3912040')

    def _handle_exception(self, error):
        if self._connection.is_closed():
            self._transaction = None
            self._started = False
    
        if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
            exception_mapping = self.dbapi._asyncpg_error_translate
    
            for super_ in type(error).__mro__:
                if super_ in exception_mapping:
                    translated_error = exception_mapping[super_](
                        "%s: %s" % (type(error), error)
                    )
                    translated_error.pgcode = (
                        translated_error.sqlstate
                    ) = getattr(error, "sqlstate", None)
>                   raise translated_error from error
E                   sqlalchemy.exc.InternalError: (sqlalchemy.dialects.postgresql.asyncpg.InternalServerError) <class 'asyncpg.exceptions.InternalServerError'>: cache lookup failed for type 3912040
E                   [SQL: INSERT INTO main.device (id, type, created_at, last_login_at, expires_at, account_type, is_banned, account_id) VALUES (%s, %s, now(), NULL, %s, %s, %s, %s)]
E                   [parameters: ('u1', 'IPHONE', datetime.datetime(2021, 6, 16, 10, 24), 'FREE', False, 1)]
E                   (Background on this error at: http://sqlalche.me/e/14/2j85)

../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:652: InternalError
Run Code Online (Sandbox Code Playgroud)

这是什么translated_error?非常感谢

Nad*_*idi 1

我能够解决这个问题。这似乎是asyncpg驱动程序准备好的语句缓存的问题。这听起来像是一个常见问题,因为他们在此处的常见问题解答中提到了它

query_cache_size=0我首先尝试通过在 SQLAlchemy 中设置 来关闭此缓存行为create_async_engineSQLAlchemy 文档中提到了这一点:

async_engine = create_async_engine(
    f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_SERVER}/{settings.POSTGRES_DB}",
    echo=True,
    query_cache_size=0
)
Run Code Online (Sandbox Code Playgroud)

不过看echo吐出的SQL日志,似乎还是使用了缓存。我认为这是因为我误解了asyncpg和 SQLAlchemy 似乎都实现了某种语句缓存。SQLAlchemy 参数不会影响asyncpg行为。

然后我在SQLAlchemy GitHub Issue 6467上找到了一些关于asyncpg准备好的语句缓存的讨论。

根据该线程中的评论,我能够通过prepared_statement_cache_size=0直接在 PostgreSQL URI 中传递作为查询参数来解决该问题。新的工作create_async_engine看起来像:

async_engine = create_async_engine(
    f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_SERVER}/{settings.POSTGRES_DB}?prepared_statement_cache_size=0",
    echo=True
)
Run Code Online (Sandbox Code Playgroud)

注意query_cache_sizeSQLAlchemy 参数不会影响此问题,因此我删除了该参数,以允许 SQLAlchemy 使用 1.4 中的默认行为缓存其已编译的 SQL 语句。