具有多个绑定的 SQLAlchemy - 动态选择绑定到查询

yan*_*vps 6 python sqlalchemy flask flask-sqlalchemy

我有 4 个不同的数据库,每个数据库对应我的一位客户(医疗诊所),所有这些数据库都具有完全相同的结构。

Patient在我的应用程序中,我有、DoctorAppointment等模型。

我们以其中一个为例:

class Patient(db.Model):
    __tablename__ = "patients"

    id = Column(Integer, primary_key=True)
    first_name = Column(String, index=True)
    last_name = Column(String, index=True)
    date_of_birth = Column(Date, index=True)
Run Code Online (Sandbox Code Playgroud)

我发现在绑定的帮助下,我可以创建不同的数据库并将每个模型关联到不同的绑定。所以我有这样的配置:

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:pass@localhost/main'
app.config['SQLALCHEMY_BINDS'] = {
    'clinic1':'mysql://user:pass@localhost/clinic1',
    'clinic2':'mysql://user:pass@localhost/clinic2',
    'clinic3':'mysql://user:pass@localhost/clinic3',
    'clinic4':'mysql://user:pass@localhost/clinic4'
}
Run Code Online (Sandbox Code Playgroud)

现在我正在努力实现两件事:

  1. 我希望当我使用db.create_all()它创建表时将在所有 4 个数据库中创建patients表(clinic1->clinic4)
  2. 我希望能够动态选择特定的绑定(在运行时),以便任何查询(例如)Patient.query.filter().count()将针对所选的绑定数据库运行

理想情况下,它的行为如下:

with DbContext(bind='client1'):
    patients_count = Patient.query.filter().count()
    print(patients_count)

# outside of the `with` context we are back to the default bind
Run Code Online (Sandbox Code Playgroud)

但是,这样做:

patients_count = Patient.query.filter().count()
Run Code Online (Sandbox Code Playgroud)

不指定绑定,将引发错误(因为patients默认绑定中不存在该表)

任何可以指导如何完成此操作的代码示例将不胜感激!

PS 您可能会建议不要使用不同的数据库,而是使用具有不同列/表的数据库,但请坚持我的示例,并尝试解释如何使用多个相同数据库的这种模式来完成此操作,谢谢

aar*_*ron 14

1.在所有绑定中创建表

观察:db.create_all()调用self.get_tables_for_bind()

解决方案:覆盖SQLAlchemy get_tables_for_bind()以支持'__all__'.

class MySQLAlchemy(SQLAlchemy):

    def get_tables_for_bind(self, bind=None):
        result = []
        for table in self.Model.metadata.tables.values():
            # if table.info.get('bind_key') == bind:
            if table.info.get('bind_key') == bind or (bind is not None and table.info.get('bind_key') == '__all__'):
                result.append(table)
        return result
Run Code Online (Sandbox Code Playgroud)

用法:

# db = SQLAlchemy(app)  # Replace this
db = MySQLAlchemy(app)  # with this

db.create_all()
Run Code Online (Sandbox Code Playgroud)

2.动态选择特定的绑定

观察:SignallingSession get_bind()负责确定绑定。

解决方案:

  1. 覆盖SignallingSession get_bind()以从某个上下文获取绑定密钥。
  2. 重写SQLAlchemy create_session()以使用我们的自定义会话类。
  3. 支持上下文选择特定的绑定db以实现可访问性。
  4. '__all__'通过覆盖SQLAlchemy get_binds()恢复默认引擎,强制为具有绑定键的表指定上下文。
class MySignallingSession(SignallingSession):
    def __init__(self, db, *args, **kwargs):
        super().__init__(db, *args, **kwargs)
        self.db = db

    def get_bind(self, mapper=None, clause=None):
        if mapper is not None:
            info = getattr(mapper.persist_selectable, 'info', {})
            if info.get('bind_key') == '__all__':
                info['bind_key'] = self.db.context_bind_key
                try:
                    return super().get_bind(mapper=mapper, clause=clause)
                finally:
                    info['bind_key'] = '__all__'
        return super().get_bind(mapper=mapper, clause=clause)


class MySQLAlchemy(SQLAlchemy):
    context_bind_key = None

    @contextmanager
    def context(self, bind=None):
        _context_bind_key = self.context_bind_key
        try:
            self.context_bind_key = bind
            yield
        finally:
            self.context_bind_key = _context_bind_key

    def create_session(self, options):
        return orm.sessionmaker(class_=MySignallingSession, db=self, **options)

    def get_binds(self, app=None):
        binds = super().get_binds(app=app)
        # Restore default engine for table.info.get('bind_key') == '__all__'
        app = self.get_app(app)
        engine = self.get_engine(app, None)
        tables = self.get_tables_for_bind('__all__')
        binds.update(dict((table, engine) for table in tables))
        return binds

    def get_tables_for_bind(self, bind=None):
        result = []
        for table in self.Model.metadata.tables.values():
            if table.info.get('bind_key') == bind or (bind is not None and table.info.get('bind_key') == '__all__'):
                result.append(table)
        return result
Run Code Online (Sandbox Code Playgroud)

用法:

class Patient(db.Model):
    __tablename__ = "patients"
    __bind_key__ = "__all__"  # Add this
Run Code Online (Sandbox Code Playgroud)

测试用例:

with db.context(bind='clinic1'):
    db.session.add(Patient())
    db.session.flush()         # Flush in 'clinic1'
    with db.context(bind='clinic2'):
        patients_count = Patient.query.filter().count()
        print(patients_count)  # 0 in 'clinic2'
    patients_count = Patient.query.filter().count()
    print(patients_count)      # 1 in 'clinic1'
Run Code Online (Sandbox Code Playgroud)

关于引用默认绑定的外键

您必须指定schema.

限制:

  • MySQL:
    • 绑定必须位于同一个 MySQL 实例中。否则,它必须是一个普通的列。
    • 默认绑定中的外部对象必须已经提交。
      否则,当插入引用它的对象时,您将收到此锁定错误:

      MySQLdb._exceptions.OperationalError:(1205,'超出锁定等待超时;尝试重新启动事务')

  • SQLite:不强制执行跨数据库的外键。

用法:

class Patient(db.Model):
    __tablename__ = "patients"
    __bind_key__ = "__all__"  # Add this
Run Code Online (Sandbox Code Playgroud)

测试用例:

with db.context(bind='clinic1'):
    db.session.add(Patient())
    db.session.flush()         # Flush in 'clinic1'
    with db.context(bind='clinic2'):
        patients_count = Patient.query.filter().count()
        print(patients_count)  # 0 in 'clinic2'
    patients_count = Patient.query.filter().count()
    print(patients_count)      # 1 in 'clinic1'
Run Code Online (Sandbox Code Playgroud)