在 FastAPI 中使用数据库依赖项,而无需通过函数树传递它

Max*_*urt 20 python database sqlalchemy fastapi

我目前正在复杂系统上使用 FastAPI 进行 POC。该项目业务逻辑繁重,完成后将与 50 多个不同的数据库表进行交互。每个模型都有一个服务,一些更复杂的业务逻辑有自己的服务(然后通过特定于模型的服务与不同的表进行交互/查询)。

虽然一切正常,但我的团队的一些成员对 Session 对象的依赖注入提出了一些反对。最大的问题主要是必须将会话从控制器传递到服务、第二个服务以及(在少数情况下)进一步的第三个服务。在这些情况下,中间服务功能往往没有数据库查询但他们调用其他服务的功能可能有一些。抱怨主要在于这更难以维护,并且必须到处传递数据库对象似乎毫无用处的重复。

代码示例:

Databases/mysql.py(项目中的 3 个数据库之一)

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session

def get_uri():
    return 'the mysql uri'

engine = create_engine(get_uri())

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()


def get_db():
    db: Session = SessionLocal()
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
    finally:
        db.close()

Run Code Online (Sandbox Code Playgroud)

控制器/controller1.py

from fastapi import APIRouter, HTTPException, Path, Depends
from sqlalchemy.orm import Session
from services.mysql.bar import get_bar_by_id
from services.mysql.process_x import bar_process
from databases.mysql import get_db

router = APIRouter(prefix='/foo')


@router.get('/bar/{bar_id}')
def process_bar(bar_id: int = Path(..., title='The ID of the bar to process', ge=1),
                          mysql_session: Session = Depends(get_db)):
    # From the crontroller, to a service which only runs a query. This is fine.
    bar = get_bar_by_id(bar_id, mysql_session)

    if bar is None:
        raise HTTPException(status_code=404,
                            detail='Bar not found for id: {bar_id}'.format(bar_id=bar_id))

    # This one calls a function in a service which has a lot of business logic but no queries
    processed_bar = bar_process(bar, mysql_session)

    return processed_bar

Run Code Online (Sandbox Code Playgroud)

服务/mysql/process_x.py

from .process_bar import process_the_bar
from models.mysql.w import W
from models.mysql.bar import Bar
from models.mysql.y import Y
from models.mysql.z import Z
from sqlalchemy.orm import Session


def w_process(w: W, mysql_session: Session):
    ...


def bar_process(bar: Bar, mysql_session: Session):
    # Very simplified, there's actually 5 conditional branching service calls here
    return process_the_bar(bar, mysql_session)


def y_process(y: Y, mysql_session: Session):
    ...


def z_process(z: Z, mysql_session: Session):
    ...

Run Code Online (Sandbox Code Playgroud)

服务/mysql/process_bar.py

from . import model_service1
from . import model_service2
from . import model_service3
from . import additional_rules_service
from libraries.bar_functions import do_thing_to_bar
from models.mysql.bar import Bar
from sqlalchemy.orm import Session


def process_the_bar(bar: bar, mysql_session: Session):
    process_result = list()

    # Many processing steps, not all of them require db and might work on the bar directly
    process_result.append(process1(bar, mysql_session))
    process_result.append(process2(bar, mysql_session))
    process_result.append(process3(bar, mysql_session))
    process_result.append(process4(bar))
    process_result.append(...(bar))
    process_result.append(processY(bar))


def process1(bar: Bar, mysql_session: Session):
    return model_service1.do_something(bar.val, mysql_session)


def process2(bar: Bar, mysql_session: Session):
    return model_service2.do_something(bar.val, mysql_session)


def process3(bar: Bar, mysql_session: Session):
    return model_service3.do_something(bar.val, mysql_session)

def process4-Y(bar: Bar, mysql_session: Session):
    # do something using the bar library, or maybe on another service with no queries
    return list()
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,在使用这种方法时,我们被困在传递 mysql_session 并让它在各处重复。

这是我想到的两个解决方案:

  1. 将数据库会话添加到 Starlette 请求状态

我可以通过 app.startup 事件(https://fastapi.tiangolo.com/advanced/events/)或中间件来完成此操作。但是,它确实意味着以类似的方式来回传递请求状态(如果我的理解是正确的)

  1. 使用上下文管理器的会话范围方法

几乎,我会将 get_db 函数转换为上下文管理器,而不是将其作为依赖项注入。到目前为止,最干净的最终结果,但是它完全违背了跨请求共享单个数据库会话的概念。

我已经考虑过使用编码/数据库的完全异步方法,如 FastAPI 文档(https://fastapi.tiangolo.com/advanced/async-sql-databases/)所示,但是我们正在 SqlAlchemy 上使用的数据库之一通过插件使用,我假设不支持开箱即用的异步(Vertica)。如果我错了,那么我可以考虑完全异步的方法。

所以最后,我想知道是否有可能在不影响每个请求单个会话的情况下完成一些“更干净”的事情?

Max*_*urt 11

我直接从FastAPI Github获得了一些帮助

正如用户 Insomne​​s 提到的,我想要做的事情可以通过使用ContextVar来实现。我已经在我的代码中尝试过了,它似乎工作得很好。