Hou*_*man 5 python postgresql sqlalchemy asyncpg
我正在使用 SqlAlchemy 1.4.18(异步),我相信我遇到了无法解释的竞争条件。底层数据库是Postgres与asyncpg由SQLAlchemy的内部使用。
我在 SQL Alchemy Core 中有以下插入函数。
async def create_device(
device_id: str,
device_type: DeviceType,
account_type: AccountType = AccountType.FREE,
expires_at: Optional[datetime] = None,
account_id: Optional[int] = None,
is_banned: bool = False,
last_login_at: Optional[datetime] = None,
) -> datetime:
if expires_at is None:
expires_at = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
days=7
)
async with engine.begin() as conn:
await conn.execute(
DeviceTable.insert().values(
id=device_id,
type=device_type,
expires_at=expires_at,
account_type=account_type,
account_id=account_id,
is_banned=is_banned,
last_login_at=last_login_at,
),
)
return expires_at
Run Code Online (Sandbox Code Playgroud)
单元测试自行成功运行。但是,当我在测试类中运行所有测试时,此测试每次都会失败。
@pytest.mark.asyncio
@patch("service.email_service.EmailService.confirm_token")
async def test_confirm_email_already_confirmed(self, mock_token, client):
expiry_date = self.get_time_in_future()
account_id = await crud_account.create_account(
"h@h1.de", "pass1", is_confirmed=True
)
await crud_device.create_device(
"u1", DeviceType.IPHONE, account_id=account_id, expires_at=expiry_date
)
## It has already failed at this point.
mock_token.return_value = "h@h1.de"
result = await client.get("/email/confirm/t1")
assert result.status_code == 200
Run Code Online (Sandbox Code Playgroud)
错误:
../app/database/crud_device.py:26: in create_device
await conn.execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/engine.py:405: in execute
result = await greenlet_spawn(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:125: in greenlet_spawn
result = context.throw(*sys.exc_info())
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1582: in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:324: in _execute_on_connection
return connection._execute_clauseelement(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1451: in _execute_clauseelement
ret = self._execute_context(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1813: in _execute_context
self._handle_dbapi_exception(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1994: in _handle_dbapi_exception
util.raise_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py:207: in raise_
raise exception
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1770: in _execute_context
self.dialect.do_execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/default.py:717: in do_execute
cursor.execute(statement, parameters)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:449: in execute
self._adapt_connection.await_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:67: in await_only
return current.driver.switch(awaitable)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:120: in greenlet_spawn
value = await result
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:424: in _prepare_and_execute
self._handle_exception(error)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:358: in _handle_exception
self._adapt_connection._handle_exception(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_connection object at 0x115007340>
error = InternalServerError('cache lookup failed for type 3912040')
def _handle_exception(self, error):
if self._connection.is_closed():
self._transaction = None
self._started = False
if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
exception_mapping = self.dbapi._asyncpg_error_translate
for super_ in type(error).__mro__:
if super_ in exception_mapping:
translated_error = exception_mapping[super_](
"%s: %s" % (type(error), error)
)
translated_error.pgcode = (
translated_error.sqlstate
) = getattr(error, "sqlstate", None)
> raise translated_error from error
E sqlalchemy.exc.InternalError: (sqlalchemy.dialects.postgresql.asyncpg.InternalServerError) <class 'asyncpg.exceptions.InternalServerError'>: cache lookup failed for type 3912040
E [SQL: INSERT INTO main.device (id, type, created_at, last_login_at, expires_at, account_type, is_banned, account_id) VALUES (%s, %s, now(), NULL, %s, %s, %s, %s)]
E [parameters: ('u1', 'IPHONE', datetime.datetime(2021, 6, 16, 10, 24), 'FREE', False, 1)]
E (Background on this error at: http://sqlalche.me/e/14/2j85)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:652: InternalError
Run Code Online (Sandbox Code Playgroud)
这是什么translated_error?非常感谢
我能够解决这个问题。这似乎是asyncpg驱动程序准备好的语句缓存的问题。这听起来像是一个常见问题,因为他们在此处的常见问题解答中提到了它
query_cache_size=0我首先尝试通过在 SQLAlchemy 中设置 来关闭此缓存行为create_async_engine。SQLAlchemy 文档中提到了这一点:
async_engine = create_async_engine(
f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_SERVER}/{settings.POSTGRES_DB}",
echo=True,
query_cache_size=0
)
Run Code Online (Sandbox Code Playgroud)
不过看echo吐出的SQL日志,似乎还是使用了缓存。我认为这是因为我误解了asyncpg和 SQLAlchemy 似乎都实现了某种语句缓存。SQLAlchemy 参数不会影响asyncpg行为。
然后我在SQLAlchemy GitHub Issue 6467上找到了一些关于asyncpg准备好的语句缓存的讨论。
根据该线程中的评论,我能够通过prepared_statement_cache_size=0直接在 PostgreSQL URI 中传递作为查询参数来解决该问题。新的工作create_async_engine看起来像:
async_engine = create_async_engine(
f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_SERVER}/{settings.POSTGRES_DB}?prepared_statement_cache_size=0",
echo=True
)
Run Code Online (Sandbox Code Playgroud)
注意:query_cache_sizeSQLAlchemy 参数不会影响此问题,因此我删除了该参数,以允许 SQLAlchemy 使用 1.4 中的默认行为缓存其已编译的 SQL 语句。
| 归档时间: |
|
| 查看次数: |
126 次 |
| 最近记录: |