可以使用SQLAlchemy事件来更新非规范化数据缓存吗?

Eli*_*ght 10 python sqlalchemy denormalization

出于性能原因,我有一个非规范化数据库,其中一些表包含从其他表中的许多行聚合的数据.我想通过使用SQLAlchemy事件来维护这个非规范化的数据缓存.例如,假设我正在编写论坛软件,并希望每个Thread人都有一个列跟踪线程中所有注释的组合字数,以便有效地显示该信息:

class Thread(Base):
    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    title = Column(UnicodeText(), nullable=False)
    word_count = Column(Integer, nullable=False, default=0)

class Comment(Base):
    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    thread_id = Column(UUID, ForeignKey('thread.id', ondelete='CASCADE'), nullable=False)
    thread = relationship('Thread', backref='comments')
    message = Column(UnicodeText(), nullable=False)

    @property
    def word_count(self):
        return len(self.message.split())
Run Code Online (Sandbox Code Playgroud)

因此,每次插入注释时(为简单起见,我们都要说注释永远不会被编辑或删除),我们希望更新word_count相关Thread对象的属性.所以我想做点什么

def after_insert(mapper, connection, target):
    thread = target.thread
    thread.word_count = sum(c.word_count for c in thread.comments)
    print "updated cached word count to", thread.word_count

event.listen(Comment, "after_insert", after_insert)
Run Code Online (Sandbox Code Playgroud)

因此,当我插入a时Comment,我可以看到事件触发并看到它已正确计算字数,但该更改未保存到Thread数据库中的行.我没有看到关于after_insert文档中更新的其他表的任何警告,但我确实看到其他一些警告,例如after_delete.

那么有一种支持的方法来使用SQLAlchemy事件吗?我已经将SQLAlchemy事件用于许多其他事情了,所以我想做所有这些事情,而不是必须编写数据库触发器.

zzz*_*eek 36

在after_insert()事件是做到这一点的一种方式,你可能会注意到它传递一个SQLAlchemy的Connection,而不是一个对象,Session如与其他平齐相关事件的情况.映射器级刷新事件通常用于直接在给定的上调用SQL Connection:

@event.listens_for(Comment, "after_insert")
def after_insert(mapper, connection, target):
    thread_table = Thread.__table__
    thread = target.thread
    connection.execute(
            thread_table.update().
             where(thread_table.c.id==thread.id).
             values(word_count=sum(c.word_count for c in thread.comments))
    )
    print "updated cached word count to", thread.word_count
Run Code Online (Sandbox Code Playgroud)

值得注意的是,直接调用UPDATE语句也比在整个工作单元流程中再次运行该属性更加高效.

但是,这里并不真正需要像after_insert()这样的事件,因为我们知道在刷新之前"word_count"的值会发生.我们实际上知道它是注释和Thread对象相互关联,我们也可以使用属性事件始终在内存中保持Thread.word_count完全新鲜:

def _word_count(msg):
    return len(msg.split())

@event.listens_for(Comment.message, "set")
def set(target, value, oldvalue, initiator):
    if target.thread is not None:
        target.thread.word_count += (_word_count(value) - _word_count(oldvalue))

@event.listens_for(Comment.thread, "set")
def set(target, value, oldvalue, initiator):
    # the new Thread, if any
    if value is not None:
        value.word_count += _word_count(target.message)

    # the old Thread, if any
    if oldvalue is not None:
        oldvalue.word_count -= _word_count(target.message)
Run Code Online (Sandbox Code Playgroud)

这种方法的最大优点是也没有必要遍历thread.comments,对于卸载的集合,这意味着发出另一个SELECT.

还有一种方法是在before_flush()中完成.下面是一个快速而又脏的版本,可以对其进行细化,以便更仔细地分析已更改的内容,以确定word_count是否需要更新:

@event.listens_for(Session, "before_flush")
def before_flush(session, flush_context, instances):
    for obj in session.new | session.dirty:
        if isinstance(obj, Thread):
            obj.word_count = sum(c.word_count for c in obj.comments)
        elif isinstance(obj, Comment):
            obj.thread.word_count = sum(c.word_count for c in obj.comments)
Run Code Online (Sandbox Code Playgroud)

我会使用属性事件方法,因为它是最高性能和最新的.

  • 如果你真的希望UPDATE是"原子的",这样即使没有可序列化的隔离也不会发生竞争条件,那么你需要在所有线程注释上针对子查询运行它,而不是在thread.comments上的内存中迭代,这可能不是完整的集合. (2认同)

mwh*_*ite 5

您可以使用 SQLAlchemy-Utilsaggregated列执行此操作:http : //sqlalchemy-utils.readthedocs.org/en/latest/aggregates.html

  • 现在一切又回到了原点。该函数的文档指定了这个堆栈溢出答案作为灵感 (3认同)