Mar*_*dru 31 sqlalchemy python-3.x python-asyncio
在 SqlAlchemy 异步 orm 引擎中,如何查询表并获取一个值或全部值?
我从非异步方法知道我可以这样做
SESSION.query(TableClass).get(x)
但尝试使用异步方法会引发下一个错误:
AttributeError: 'AsyncSession' object has no attribute 'query'.
这是我定义的 SESSION 变量。LOOP 变量仅asyncio.get_event_loop()用于在加载 sql 模块时启动异步方法并填充用作缓存的变量,以避免每次需要时都缓存数据库:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from .. import CONFIG, LOOP
def _build_async_db_uri(uri):
if "+asyncpg" not in uri:
return '+asyncpg:'.join(uri.split(":", 1))
return uri
async def start() -> declarative_base:
engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
async with engine.begin() as conn:
BASE.metadata.bind = engine
await conn.run_sync(BASE.metadata.create_all)
return scoped_session(sessionmaker(bind=engine, autoflush=False, class_=AsyncSession))
BASE = declarative_base()
SESSION = LOOP.run_until_complete(start())
Run Code Online (Sandbox Code Playgroud)
这是表和缓存函数的示例:
class TableClass:
__tablename__ = "tableclass"
id = Column(Integer, primary_key = True)
alias = Column(Integer)
CACHE = {}
async def _load_all():
global CACHE
try:
curr = await SESSION.query(TableClass).all()
CACHE = {i.id: i.alias for i in curr}
LOOP.run_until_complete(_load_all())
Run Code Online (Sandbox Code Playgroud)
小智 34
session.query 是旧的 API。异步版本使用select和附带方法。
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
async_session = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
CACHE = {}
async def _load_all():
global CACHE
try:
async with async_session() as session:
q = select(TableClass)
result = await session.execute(q)
curr = result.scalars()
CACHE = {i.id: i.alias for i in curr}
except:
pass
LOOP.run_until_complete(_load_all())
Run Code Online (Sandbox Code Playgroud)
您可以在此处阅读有关 SqlAlchemy 异步 I/O 的更多信息
Ram*_*ias 11
版本 1.4 中的新增功能:添加了 Session.get(),该方法是从现已弃用的 Query.get() 方法中移出的。
id = 1
user = await session.get(User, id)
Run Code Online (Sandbox Code Playgroud)
Session.get - SQLAlchemy 1.4 文档
正确的方法是使用 select 语句。
statement = select(User).where(User.name == 'John')
result = await session.execute(statement)
# This will return a collection of users named 'John'
johns : list[User] = result.scalars.all()
# This will take the first 'John'
first_john : User = result.scalar()
# This will take one result.
# If your query has more than one result, an exception will be thrown.
# Use it only if you're expecting a single result
john : User = result.scalars.one()
Run Code Online (Sandbox Code Playgroud)
相关:https ://stackoverflow.com/a/63591326/8843585
| 归档时间: |
|
| 查看次数: |
52467 次 |
| 最近记录: |