如何在异步 sqlalchemy 中使用原始 sql?

Pom*_*oma 3 python sqlalchemy python-3.x

我有 asyncio sqlalchemy 代码:

import asyncio
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import text, Column, Integer, String
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

async_engine = create_async_engine(f'mysql+aiomysql://root:example@127.0.0.1:3306/spy-bot')
AsyncSession = sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)

Base = declarative_base()
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String(255))

async def main():
    async with AsyncSession() as session:
        stmt = text('SELECT * FROM `users` LIMIT 1')
        result = await session.execute(stmt)
        user = result.one()
        print(type(user), user)

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

如何在仍然使用原始 sql 的情况下使会话查询返回 User 类上的实例?

在同步版本上,这看起来像

result = session.query(User).from_statement(text(query))
Run Code Online (Sandbox Code Playgroud)

sna*_*erb 5

您可以以相同的方式执行此操作,尽管使用 2.0 样式语法,因为asyncio 不支持Query遗留类:

result = await session.scalars(select(User).from_statement(text_object))
Run Code Online (Sandbox Code Playgroud)

从文本语句获取 ORM 结果中的文档适用。

完整的函数可能如下所示:

import sqlalchemy as sa
...
async def main():
    async with AsyncSession() as session:
        stmt = text('SELECT * FROM `users` LIMIT 1')
        result = await session.scalars(sa.select(User).from_statement(stmt))
        user = result.one()
        print(type(user), user)
    await async_engine.dispose()
Run Code Online (Sandbox Code Playgroud)