如何使用异步 sqlalchemy 访问关系?

muo*_*uon 18 python sqlalchemy async-await python-asyncio asyncpg

import asyncio

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import sessionmaker

engine = create_async_engine(
        "postgresql+asyncpg://user:pass@localhost/db",
        echo=True,
    )


# expire_on_commit=False will prevent attributes from being expired
# after commit.
async_session = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)


Base = declarative_base()

class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)
    data = Column(String)
    create_date = Column(DateTime, server_default=func.now())
    bs = relationship("B")

    # required in order to access columns with server defaults
    # or SQL expression defaults, subsequent to a flush, without
    # triggering an expired load
    __mapper_args__ = {"eager_defaults": True}


class B(Base):
    __tablename__ = "b"
    id = Column(Integer, primary_key=True)
    a_id = Column(ForeignKey("a.id"))
    data = Column(String)
    
    
       

async with engine.begin() as conn:
    await conn.run_sync(Base.metadata.drop_all)
    await conn.run_sync(Base.metadata.create_all)


async with async_session() as session:
    async with session.begin():
        session.add_all(
            [
                A(bs=[B(), B()], data="a1"),
                A(bs=[B()], data="a2"),
            ]
        )


async with async_session() as session:
    result = await session.execute(select(A).order_by(A.id))
    a1 = result.scalars().first()

    # no issue: 
    print(a1.name, a1.data)

    # throws error:
    print(a1.bs)
    
Run Code Online (Sandbox Code Playgroud)

尝试访问a1.bs 会出现此错误:

     59     current = greenlet.getcurrent()
     60     if not isinstance(current, _AsyncIoGreenlet):
---> 61         raise exc.MissingGreenlet(
     62             "greenlet_spawn has not been called; can't call await_() here. "
     63             "Was IO attempted in an unexpected place?"

MissingGreenlet: greenlet_spawn has not been called; can't call await_() here. Was IO attempted in an unexpected place? (Background on this error at: https://sqlalche.me/e/14/xd2s)


Run Code Online (Sandbox Code Playgroud)

muo*_*uon 19

方法如下:

from sqlalchemy.orm import selectinload

async with async_session() as session:
    result = await session.execute(select(A).order_by(A.id)
                                            .options(selectinload(A.bs)))
    a = result.scalars().first()

    print(a.bs)

Run Code Online (Sandbox Code Playgroud)

关键是使用selectinload防止隐式IO的方法

更新

有几种选择selectinload喜欢joinedloadlazyload。我仍在尝试理解其中的差异。

  • 这甚至在尝试与 pydantic 之类的东西同步反序列化时也有效,因为对象引用已经存在,因此无需进行异步调用。 (4认同)

bfo*_*ine 13

如果您想要急切加载(更好),那么muon\xe2\x80\x99s 的答案是正确的。

\n

但是,如果由于某种原因您已经加载了模型并且稍后想要加载关系,则有一种从 SQLAlchemy 2.0.4开始的方法:

\n

使用session.refresh,您可以告诉它加载a1.bs

\n
await session.refresh(a1, attribute_names=["bs"])\nprint(a1.bs)  # This works\n
Run Code Online (Sandbox Code Playgroud)\n

来自文档

\n
\n

版本 2.0.4 中的新增功能:添加了对强制加载延迟加载关系的支持AsyncSession.refresh()和底层方法(如果它们在参数中显式命名)。Session.refresh()Session.refresh.attribute_names

\n
\n