我有一系列由 Python/SQLAlchemy 维护的相关表。如果我删除特定表中的一行,我希望能够在将来的某个时间点撤销该删除操作,以防出现错误。我可以使用 is_deleted 列并对其进行过滤来做到这一点,但是当我查询其他表以获取相关数据时,这变得很痛苦。我可以向所有其他表添加一个 is_deleted 列,当主表中的一行被删除时,将它们全部切换。但是对于所有表上的每个查询,我都必须对 is_deleted 进行过滤。可以做到,但我希望有更好的策略。
一个想法是将所有已删除的数据移动到另一组表中,该表仅存储已删除的数据。但是我不清楚 SQLAlchemy 是否允许我切换与特定对象关联的表。我认为这将是首选的解决方案,但我不知道是否可以完成。
另一个想法是我可以运行第二个数据库,并将删除的数据复制过来。但这增加了一层我想避免的管理复杂性。
任何想法将不胜感激。
很多人都在做“is_deleted”的事情,我同意我也不喜欢它,尽管我们在PreFilteredQuery有一个食谱。
正如其他人所建议的那样,您正在寻找的是“版本控制”配方。我们在 SQLAlchemy 文档中的版本化对象中介绍了将数据副本存储在单独的版本化表中的综合示例。
在这里,我调整了该示例中使用的一些技术,以生成一个更直接的配方,专门只跟踪“已删除”的对象,并包括一个“恢复”功能,它将“恢复”给定的行回到主表。所以它不是“SQLAlchemy 允许我切换与特定对象关联的表”,它更像是创建了另一个映射类,它类似于主类,也可用于根据您的请求“反转”删除. 线下的所有内容__main__都是概念验证。
from sqlalchemy.orm import Session, object_session
from sqlalchemy import event
def preserve_deleted(class_):
def copy_col(col):
newcol = col.copy()
newcol.constraints = set()
return newcol
keys = class_.__table__.c.keys()
cols = dict(
(col.key, copy_col(col)) for col in class_.__table__.c
)
cols['__tablename__'] = "%s_deleted" % class_.__table__.name
class History(object):
def restore(self):
sess = object_session(self)
sess.delete(self)
sess.add(copy_inst(self, class_))
hist_class = type(
'%sDeleted' % class_.__name__,
(History, Base),
cols)
def copy_inst(fromobj, tocls):
return tocls(**dict(
(key, getattr(fromobj, key))
for key in keys
))
@event.listens_for(Session, 'before_flush')
def check_deleted(session, flush_context, instances):
for del_ in session.deleted:
if isinstance(del_, class_):
h = copy_inst(del_, hist_class)
session.add(h)
class_.deleted = hist_class
return class_
if __name__ == '__main__':
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship, Session
from sqlalchemy import create_engine
Base = declarative_base()
@preserve_deleted
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
data1 = Column(String)
data2 = Column(String)
@preserve_deleted
class B(Base):
__tablename__ = 'b'
id = Column(Integer, primary_key=True)
data1 = Column(String)
a_id = Column(Integer, ForeignKey('a.id'))
a = relationship("A")
e = create_engine('sqlite://', echo=True)
Base.metadata.create_all(e)
s = Session(e)
a1, a2, a3, a4 = \
A(data1='a1d1', data2='a1d2'),\
A(data1='a2d1', data2='a2d2'),\
A(data1='a3d1', data2='a3d2'),\
A(data1='a4d1', data2='a4d2')
b1, b2, b3, b4 = \
B(data1='b1', a=a1),\
B(data1='b2', a=a1),\
B(data1='b3', a=a3),\
B(data1='b4', a=a4)
s.add_all([
a1, a2, a3, a4,
b1, b2, b3, b4
])
s.commit()
assert s.query(A.id).order_by(A.id).all() == [(1, ), (2, ), (3, ), (4, )]
assert s.query(B.id).order_by(B.id).all() == [(1, ), (2, ), (3, ), (4, )]
s.delete(a2)
s.delete(b2)
s.delete(b3)
s.delete(a3)
s.commit()
assert s.query(A.id).order_by(A.id).all() == [(1, ), (4, )]
assert s.query(B.id).order_by(B.id).all() == [(1, ), (4, )]
a2_deleted = s.query(A.deleted).filter(A.deleted.id == 2).one()
a2_deleted.restore()
b3_deleted = s.query(B.deleted).filter(B.deleted.id == 3).one()
a3_deleted = s.query(A.deleted).filter(A.deleted.id == 3).one()
b3_deleted.restore()
a3_deleted.restore()
s.commit()
assert s.query(A.id).order_by(A.id).all() == [(1, ), (2, ), (3, ), (4, )]
assert s.query(B.id).order_by(B.id).all() == [(1, ), (3, ), (4, )]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
973 次 |
| 最近记录: |