如何使用FastApi和SqlAlchemy进行异步操作

Exp*_*oit 2 sqlalchemy python-3.x python-asyncio fastapi

我正在使用翻译软件,表达上可能存在一些错误,请理解我已经查了网上的教程并按照相应的思路进行操作,但是我遇到了一点问题,我无法使用异步SqlAlchemyc来操作Mysql。

数据库连接

from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from urllib import  parse
import  sys

engine = create_async_engine(Sql_URL)


SessionLocal = sessionmaker(bind=engine,autocommit=False,autoflush=False)
db_session = scoped_session(SessionLocal)

Base = declarative_base()
Base.query = db_session.query_property()
Run Code Online (Sandbox Code Playgroud)

楷模

import asyncio
import datetime

from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, MetaData
from sqlalchemy.orm import registry
from  sqlalchemy_utils import EmailType,ChoiceType

from app.Fast_blog.database import Base


class user(Base):
    choices = [
        ('0', 'woman'),
        ('1', 'man'),
        ('2', 'NULL')
    ]
    __tablename__ = "usertable"
    __table_args__ = {'extend_existing': True}
    UserId = Column(Integer,primary_key = True,index = True)
    username = Column(String(255))
    userpassword = Column(String(255))
    gender = Column(ChoiceType(choices))
    creation_time = Column(DateTime,default = datetime.datetime.now)
    Last_Login_Time = Column(DateTime,default= datetime.datetime.now)
    UserUuid = Column(String(255))
    UserEmail = Column(EmailType(255))

class Blog(Base):
    __tablename__ = "blogtable"
    __table_args__ = {'extend_existing': True}
    title = Column(String(255))
    content = Column(String(255))
    author = Column(String(255))
    BlogId = Column(String(255),primary_key=True,index=True)
    BlogUuid = Column(String(32))
Run Code Online (Sandbox Code Playgroud)

意见

import uuid
from typing import Generator

from sqlalchemy.ext.asyncio import async_session
from sqlalchemy import select

from app.Fast_blog import model
from app.Fast_blog.model import models
from app.Fast_blog.database.database import engine
from sqlalchemy.orm import sessionmaker
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import EmailStr
from  fastapi import APIRouter

from app.Fast_blog.model.models import user

SessionLocal = sessionmaker(autocommit=False,autoflush=False,bind=engine)
Session = SessionLocal()


UserApp = APIRouter()

templates = Jinja2Templates(directory="./Fast_blog/templates")

UserApp.mount("/static", StaticFiles(directory="./Fast_blog/static"), name="static")


def UUID_crt(UuidApi):
    x = uuid.uuid5(uuid.NAMESPACE_DNS,UuidApi)
    return x

@UserApp.get("/")
async def query():
    async with Session as session:
        sql = select(model).where(model.id == 1)
        print(sql)
        result = await session.execute(sql)
        data = result.scalars().first()
        # data = result.scalars().all()
Run Code Online (Sandbox Code Playgroud)

当我去测试这个def查询函数
时然后我有一个问题,这个异步SqlAlchemyc连接是成功还是失败?或者我的编码有问题吗?
它报告一个错误

Traceback (most recent call last):
  File "d:\python3\lib\site-packages\uvicorn\protocols\http\h11_impl.py", line 404, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "d:\python3\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
  File "d:\python3\lib\site-packages\uvicorn\middleware\debug.py", line 106, in __call__
    raise exc from None
  File "d:\python3\lib\site-packages\uvicorn\middleware\debug.py", line 103, in __call__
    await self.app(scope, receive, inner_send)
  File "d:\python3\lib\site-packages\fastapi\applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "d:\python3\lib\site-packages\starlette\applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "d:\python3\lib\site-packages\starlette\middleware\errors.py", line 184, in __call__
    raise exc
  File "d:\python3\lib\site-packages\starlette\middleware\errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "d:\python3\lib\site-packages\starlette\middleware\exceptions.py", line 79, in __call__
    raise exc
  File "d:\python3\lib\site-packages\starlette\middleware\exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "d:\python3\lib\site-packages\fastapi\middleware\asyncexitstack.py", line 21, in __call__
    raise e
  File "d:\python3\lib\site-packages\fastapi\middleware\asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "d:\python3\lib\site-packages\starlette\routing.py", line 706, in __call__
    await route.handle(scope, receive, send)
  File "d:\python3\lib\site-packages\starlette\routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "d:\python3\lib\site-packages\starlette\routing.py", line 66, in app
    response = await func(request)
  File "d:\python3\lib\site-packages\fastapi\routing.py", line 237, in app
    raw_response = await run_endpoint_function(
  File "d:\python3\lib\site-packages\fastapi\routing.py", line 163, in run_endpoint_function
    return await dependant.call(**values)
  File ".\Fast_blog\apps\User_app\user.py", line 37, in query
    async with Session as session:
AttributeError: __aenter__
Run Code Online (Sandbox Code Playgroud)

请帮助我,指出我的代码哪里不正确,我也尝试了谷歌。但这似乎和我的问题不一样

pyt*_*ser 5

您正在用它创建一个同步会话SessionLocal = sessionmaker(bind=engine,autocommit=False,autoflush=False),但稍后您尝试使用它,因为它是一个异步会话。

我假设您使用的是 SQLAlchemy 1.4,如果是这样,您需要按照class_=AsyncSession此处提到的方式通过https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#asyncio-scoped-session

from asyncio import current_task
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_scoped_session
from sqlalchemy.orm import sessionmaker

engine = create_async_engine(Sql_URL)

SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False, class_=AsyncSession)
db_session = async_scoped_session(SessionLocal, scopefunc=current_task)
Run Code Online (Sandbox Code Playgroud)