SQLAlchemy 批量更新策略

Mat*_*ton 3 mysql performance orm sqlalchemy flask-sqlalchemy

我目前正在使用 SQLAlchemy(在 GAE 上,连接到 Google 的云 MySQL)编写一个 Web 应用程序(Flask)并且需要对表进行批量更新。简而言之,完成了许多计算,导致需要在 1000 个对象上更新单个值。目前我正在一个事务中完成这一切,但最终,刷新/提交需要很长时间。

该表有一个索引id,这一切都在单个事务中执行。所以我相信我已经避免了通常的错误,但仍然很慢。

INFO     2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656}
Run Code Online (Sandbox Code Playgroud)

根据我的理解,实际上无法在 SQL 中进行批量更新,并且上面的语句最终将多个 UPDATE 语句发送到服务器。

我试过使用,Session.bulk_update_mappings()但这似乎并没有真正做任何事情:( 不知道为什么,但更新从未真正发生过。我看不到任何实际使用这种方法的例子(包括在性能套件中)所以没有确定它是否打算使用。

我见过讨论过的一种技术是在另一个表中进行批量插入,然后进行 UPDATE JOIN。我已经给它做了一个测试,如下所示,它似乎要快得多。

wallets = db_session.query(Wallet).all()
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ]
db_session.bulk_save_objects(ledgers)
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
db_session.execute('TRUNCATE ledger')
Run Code Online (Sandbox Code Playgroud)

但现在的问题是如何构建我的代码。我正在使用 ORM,我需要以某种方式不“弄脏”原始Wallet对象,以便它们不会以旧方式提交。我可以Ledger改为创建这些对象并保留它们的列表,然后在批量操作结束时手动插入它们。但这几乎闻起来像是我在复制 ORM 机制的一些工作。

有没有更聪明的方法来做到这一点?到目前为止,我的大脑正在下降,例如:

class Wallet(Base):
    ...
    _balance = Column(Float)
    ...

@property
def balance(self):
    # first check if we have a ledger of the same id
    # and return the amount in that, otherwise...
    return self._balance

@balance.setter
def balance(self, amount):
    l = Ledger(id=self.id, amount=amount)
    # add l to a list somewhere then process later

# At the end of the transaction, do a bulk insert of Ledgers
# and then do an UPDATE JOIN and TRUNCATE
Run Code Online (Sandbox Code Playgroud)

正如我所说,这一切似乎都在与我(可能)拥有的工具作斗争。有没有更好的方法来处理这个问题?我可以利用 ORM 机制来做到这一点吗?或者有没有更好的方法来进行批量更新?

编辑:或者活动和会议是否有一些聪明的地方?也许before_flush?

编辑 2:所以我试图利用事件机制,现在有了这个:

@event.listens_for(SignallingSession, 'before_flush')
def before_flush(session, flush_context, instances):
    ledgers = []

    if session.dirty:
        for elem in session.dirty:
            if ( session.is_modified(elem, include_collections=False) ):
                if isinstance(elem, Wallet):
                    session.expunge(elem)
                    ledgers.append(Ledger(id=elem.id, amount=elem.balance))

    if ledgers:
        session.bulk_save_objects(ledgers)
        session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
        session.execute('TRUNCATE ledger')
Run Code Online (Sandbox Code Playgroud)

这对我来说似乎很hacky和邪恶,但似乎工作正常。任何陷阱,或更好的方法?

-马特

uni*_*rio 5

您本质上所做的是绕过 ORM 以优化性能。因此,不要对您“复制 ORM 正在做的工作”感到惊讶,因为这正是您需要做的。

除非你有很多地方需要像这样进行批量更新,否则我建议不要使用魔法事件方法;简单地编写显式查询要简单得多。

我建议做的是使用 SQLAlchemy Core 而不是 ORM 来进行更新:

ledger = Table("ledger", db.metadata,
    Column("wallet_id", Integer, primary_key=True),
    Column("new_balance", Float),
    prefixes=["TEMPORARY"],
)


wallets = db_session.query(Wallet).all()

# figure out new balances
balance_map = {}
for w in wallets:
    balance_map[w.id] = calculate_new_balance(w)

# create temp table with balances we need to update
ledger.create(bind=db.session.get_bind())

# insert update data
db.session.execute(ledger.insert().values([{"wallet_id": k, "new_balance": v}
                                           for k, v in balance_map.items()])

# perform update
db.session.execute(Wallet.__table__
                         .update()
                         .values(balance=ledger.c.new_balance)
                         .where(Wallet.__table__.c.id == ledger.c.wallet_id))

# drop temp table
ledger.drop(bind=db.session.get_bind())

# commit changes
db.session.commit()
Run Code Online (Sandbox Code Playgroud)