跟踪SQLAlchemy中的模型更改

V. *_*ula 19 python sqlalchemy unit-of-work

我想记录一些SQLAlchemy-Models将要完成的操作.

所以,我有一个after_insert,after_delete和before_update钩子,在那里我将保存模型的先前和当前表示,

def keep_logs(cls):
    @event.listens_for(cls, 'after_delete')
    def after_delete_trigger(mapper, connection, target):
        pass

    @event.listens_for(cls, 'after_insert')
    def after_insert_trigger(mapper, connection, target):
        pass

    @event.listens_for(cls, 'before_update')
    def before_update_trigger(mapper, connection, target):
        prev = cls.query.filter_by(id=target.id).one()
        # comparing previous and current model


MODELS_TO_LOGGING = (
    User,
)
for cls in MODELS_TO_LOGGING:
    keep_logs(cls)
Run Code Online (Sandbox Code Playgroud)

但是有一个问题:当我试图在before_update钩子中找到模型时,SQLA返回修改(脏)版本.如何在更新之前获取以前版本的模型?是否有不同的方法来保持模型更改?

谢谢!

dav*_*ism 22

SQLAlchemy跟踪每个属性的更改.您不需要(也不应该)在事件中再次查询实例.此外,即使该修改不会更改任何数据,也会针对已修改的任何实例触发事件.遍历每列,检查它是否已被修改,并存储任何新值.

@event.listens_for(cls, 'before_update')
def before_udpate(mapper, connection, target):
    state = db.inspect(target)
    changes = {}

    for attr in state.attrs:
        hist = attr.load_history()

        if not hist.has_changes():
            continue

        # hist.deleted holds old value
        # hist.added holds new value
        changes[attr.key] = hist.added

    # now changes map keys to new values
Run Code Online (Sandbox Code Playgroud)


plo*_*man 9

我有一个类似的问题,但希望能够跟踪增量,因为对 sqlalchemy 模型进行了更改,而不仅仅是新值。我写了这个对davidism 的回答的轻微扩展,以及对beforeand 的稍微更好的处理after,因为它们有时是列表,有时是空元组:

  from sqlalchemy import inspect

  def get_model_changes(model):
    """
    Return a dictionary containing changes made to the model since it was 
    fetched from the database.

    The dictionary is of the form {'property_name': [old_value, new_value]}

    Example:
      user = get_user_by_id(420)
      >>> '<User id=402 email="business_email@gmail.com">'
      get_model_changes(user)
      >>> {}
      user.email = 'new_email@who-dis.biz'
      get_model_changes(user)
      >>> {'email': ['business_email@gmail.com', 'new_email@who-dis.biz']}
    """
    state = inspect(model)
    changes = {}
    for attr in state.attrs:
      hist = state.get_history(attr.key, True)

      if not hist.has_changes():
        continue

      old_value = hist.deleted[0] if hist.deleted else None
      new_value = hist.added[0] if hist.added else None
      changes[attr.key] = [old_value, new_value]

    return changes

  def has_model_changed(model):
    """
    Return True if there are any unsaved changes on the model.
    """
    return bool(get_model_changes(model))
Run Code Online (Sandbox Code Playgroud)