Python 依赖注入器在每个请求中传递单个 SQLAlchemy 数据库连接

Far*_*oro 5 python dependency-injection sqlalchemy

我尝试使用depedency_injector在我的控制台应用程序中实现 DI

我有一些规则,每个请求都应该对所有依赖项使用相同的数据库连接,因此当有请求时,它将创建一个新的数据库连接,每个请求 1 个 DbConnection,这样我就可以使此事务性(ACID)。

这是我的代码,仍然在所有依赖项上创建新的数据库连接。我不想使用 Singleton,因为它只使用了 1 个连接。

数据库.py

engine_mysql = create_engine(DB_MYSQL_DSN, pool_size=20, max_overflow=0)
session_mysql = sessionmaker(autocommit=False, autoflush=False, bind=engine_mysql)

class Database:
    _session_mysql: Session
    _session_psql: Session

    def __init__(self) -> None:
        print(100 * "=")
        print("SESSION CREATED")
        print(100 * "=")
        self._session_mysql = session_mysql()

    def get_mysql_session(self):
        print(100 * "=")
        print("SESSION CALL MYSQL")
        print(100 * "=")
        return self._session_mysql
Run Code Online (Sandbox Code Playgroud)

数据库容器.py

class DatabaseContainer(containers.DeclarativeContainer):

    database = providers.Factory(Database)
    db_mysql = providers.Singleton(database.provided.get_mysql_session)
Run Code Online (Sandbox Code Playgroud)

存储库容器.py

class RepositoryContainer(containers.DeclarativeContainer):
    database = providers.DependenciesContainer()
    transaction = providers.Factory(TransactionRepository, session=database.db_mysql)
    transaction_package = providers.Factory(
      TransactionPackageRepository,
      session=database.db_psql,
    )
Run Code Online (Sandbox Code Playgroud)

服务容器.py

class ServiceContainer(containers.DeclarativeContainer):
    repository = providers.DependenciesContainer()
    database = providers.DependenciesContainer()

    order = providers.Factory(
        orderService,
        # database
        db_conn=database.db_mysql,
        # repository
        transaction_repo=repository.transaction,
        transaction_package_repo=repository.transaction_package,
    )
Run Code Online (Sandbox Code Playgroud)

应用程序容器.py

class AppContainer(containers.DeclarativeContainer):

    database = providers.Container(DatabaseContainer)

    repository = providers.Container(RepositoryContainer, database=database)
    
    service = providers.Container(
        ServiceContainer,
        repository=crm_repository,
        database=database
    )
Run Code Online (Sandbox Code Playgroud)

客户端代码

class OrderWatcherJob(BaseJob):
    def __init__(self):
        self._logger = logging.getLogger(__name__)
        self.func = self.execute_job

    @inject
    def execute_job(
        self,
        order_service: OrderService = Provide[AppContainer.service.order],
        db_mysql:Session = Provide[AppContainer.database.db_mysql]
    ):

        try:
            order_service.execute_something()
            db_mysql.commit()
            #or order_service.db_conn.commit() 
        except Exception as ex:
            self._logger.error(ex)
            db_mysql.rollback()
            #or order_service.db_conn.rollback() 
        finally:
            db_mysql.close()
           #or order_service.db_conn.close() 
Run Code Online (Sandbox Code Playgroud)

那么..我该如何使其成为每个请求仅使用 1 个数据库连接?我的意思是,当我注入OrderService时,它​​将在TransactionRepoTransactionPackageRepodb_conn上使用相同的数据库连接。