FastApi Sqlalchemy 如何管理事务(会话和多次提交)

Jos*_*din 5 python database transactions sqlalchemy fastapi

我有一个带有插入和更新功能的 CRUD commit,在每个功能的末尾,如下所示:

@staticmethod
def insert(db: Session, item: Item) -> None:
    db.add(item)
    db.commit()
   
   
@staticmethod
def update(db: Session, item: Item) -> None:
    ...
    db.commit()
Run Code Online (Sandbox Code Playgroud)

我有一个端点,它从 FastAPI 依赖项接收 sqlalchemy 会话,并且需要以原子方式插入和更新(数据库事务)。

处理事务时的最佳实践是什么?我无法使用 CRUD,因为它执行多个commit.

我应该如何处理交易?你在哪里提交你的会话?在 CRUD 中?还是在每个请求的 FastAPI 依赖函数中只有一次?

Gui*_*rre 6

我在使用 FastAPI 时遇到了同样的问题。我找不到在单独的方法中使用提交并让它们以事务性方式运行的方法。我最终做的是刷新而不是commit,它将更改发送到数据库,但不提交事务。

需要注意的一件事是,在 FastAPI 中,每个请求都会打开一个新会话并在完成后关闭它。这将是使用SQLAlchemy 文档中的示例所发生情况的粗略示例。

def run_my_program():
    # This happens in the `database = SessionLocal()` of the `get_db` method below
    session = Session()
    try:
        ThingOne().go(session)
        ThingTwo().go(session)

        session.commit()
    except:
        session.rollback()
        raise
    finally:
        # This is the same as the `get_db` method below
        session.close()
Run Code Online (Sandbox Code Playgroud)

为请求生成的会话已经是一个事务。当您提交该会话时,实际执行的是

当在默认模式 autocommit=False 下使用 Session 时,新事务将在提交后立即开始,但请注意,新开始的事务在第一个 SQL 实际发出之前不会使用任何连接资源。

在我阅读之后,我认为在端点范围内处理commit和是有意义的rollback

我创建了一个虚拟示例来说明这是如何工作的。我使用 FastAPI指南中的所有内容。

def create_user(db: Session, user: UserCreate):
    """
    Create user record
    """
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.flush() # Changed this to a flush
    return db_user
Run Code Online (Sandbox Code Playgroud)

然后在端点中使用 crud 操作如下

from typing import List
from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session

...

def get_db():
    """
    Get SQLAlchemy database session
    """
    database = SessionLocal()
    try:
        yield database
    finally:
        database.close()

@router.post("/users", response_model=List[schemas.User])
def create_users(user_1: schemas.UserCreate, user_2: schemas.UserCreate, db: Session = Depends(get_db)):
    """
    Create two users
    """
    try:
        user_1 = crud.create_user(db=db, user=user_1)
        user_2 = crud.create_user(db=db, user=user_2)
        db.commit()
        return [user_1, user_2]
    except:
        db.rollback()
        raise HTTPException(status_code=400, detail="Duplicated user")
Run Code Online (Sandbox Code Playgroud)

将来我可能会调查将其移至中间件,但我认为使用commit您无法获得所需的行为。


Kee*_*thi 5

一种更Pythonic的方法是让上下文管理器根据是否存在异常来执行提交或回滚。

ATransaction是我们想要实现的目标的一个很好的抽象。

class Transaction:
    def __init__(self, session: Session = Depends(get_session)):
        self.session = session

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            # rollback and let the exception propagate
            self.session.rollback()
            return False

        self.session.commit()
        return True

Run Code Online (Sandbox Code Playgroud)

并且,在您的 API 中使用它,如下所示:

def some_api(tx: Transaction = Depends(Transaction)):
    with tx:
        ThingOne().go()
        ThingTwo().go()

Run Code Online (Sandbox Code Playgroud)

无需将会话传递给 ThingOne 和 ThingTwo。将其注入到它们中,如下所示:

class ThingOne:
   def __init__(self, session: Session = Depends(get_session)):
       ...

class ThingTwo:
   def __init__(self, session: Session = Depends(get_session)):
       ...

Run Code Online (Sandbox Code Playgroud)

我还会在 API 中注入 ThingOne 和 ThingTwo:

def some_api(tx: Transaction = Depends(Transaction), 
            one: ThingOne = Depends(ThingOne), 
            two: ThingTwo = Depends(ThingTwo)):
    with tx:
        one.go()
        two.go()

Run Code Online (Sandbox Code Playgroud)