内存高效的内置SqlAlchemy迭代器/生成器?

Pau*_*aul 74 python mysql sqlalchemy

我有一个~10M记录的MySQL表,我使用SqlAlchemy进行交互.我发现对这个表的大型子集的查询将占用太多内存,即使我认为我使用的是内置生成器,智能地获取数据集的一口大小的块:

for thing in session.query(Things):
    analyze(thing)
Run Code Online (Sandbox Code Playgroud)

为了避免这种情况,我发现我必须构建自己的迭代器,这些迭代器会以块的形式出现:

lastThingID = None
while True:
    things = query.filter(Thing.id < lastThingID).limit(querySize).all()
    if not rows or len(rows) == 0: 
        break
    for thing in things:
        lastThingID = row.id
        analyze(thing)
Run Code Online (Sandbox Code Playgroud)

这是正常的还是有关于SA内置发电机我缺少的东西?

这个问题的答案似乎表明内存消耗是不可预期的.

zzz*_*eek 108

大多数DBAPI实现在获取行时完全缓冲行 - 通常,在SQLAlchemy ORM甚至获得一个结果之前,整个结果集都在内存中.

但是,Query的工作方式是它在返回给你的对象之前默认完全加载给定的结果集.这里的基本原理认为查询不仅仅是简单的SELECT语句 - 连接到其他表,这些表可能在一个结果集中多次返回相同的对象标识(与急切加载相同),整个行集需要在内存中,所以可以返回正确的结果 - 否则集合等可能只是部分填充.

因此Query提供了一个更改此行为的选项,即yield_per()调用http://www.sqlalchemy.org/docs/orm/query.html?highlight=yield_per#sqlalchemy.orm.query.Query.yield_per.此调用将导致Query批量生成行,您可以在其中为批量指定大小.正如文档所述,这只适用于你没有做任何急切的集合加载 - 所以基本上如果你真的知道你在做什么.而且,如果底层DBAPI预先缓冲行,那么仍然会有内存开销,因此该方法仅比不使用它更好地扩展.

我几乎没有使用yield_per() - 相反,我使用上面使用窗口函数建议的LIMIT方法的更好版本.LIMIT和OFFSET有一个巨大的问题,即非常大的OFFSET值会导致查询变得越来越慢,因为N的OFFSET会导致它遍历N行 - 就像执行相同的查询五十次而不是一次,每次读取一次行数越来越多.使用窗口函数方法,我预取一组"窗口"值,这些值引用我想要选择的表的块.然后我发出单独的SELECT语句,每个语句一次从其中一个窗口拉出.

窗口函数方法在维基上http://www.sqlalchemy.org/trac/wiki/UsageRecipes/WindowedRangeQuery,我使用它非常成功.

另请注意,并非所有数据库都支持窗口功能 - 您需要PG,Oracle或SQL Server.恕我直言至少使用Postgresql绝对是值得的 - 如果你使用关系数据库,你也可以使用最好的.


Joe*_*oel 13

我一直在研究SQLAlchemy的高效遍历/分页,并希望更新这个答案.

我认为你可以使用切片调用来正确地限制查询的范围,你可以有效地重用它.

例:

window_size = 10  # or whatever limit you like
window_idx = 0
while True:
    start,stop = window_size*window_idx, window_size*(window_idx+1)
    things = query.slice(start, stop).all()
    if things is None:
        break
    for thing in things:
        analyze(thing)
    if len(things) < window_size:
        break
    window_idx += 1
Run Code Online (Sandbox Code Playgroud)


edo*_*ron 13

我不是数据库专家,但是当使用SQLAlchemy作为简单的Python抽象层(即,不使用ORM查询对象)时,我想出了一个令人满意的解决方案,可以查询300M行表而不会增加内存使用量...

这是一个虚拟的示例:

from sqlalchemy import create_engine, select

conn = create_engine("DB URL...").connect()
q = select([huge_table])

proxy = conn.execution_options(stream_results=True).execute(q)
Run Code Online (Sandbox Code Playgroud)

然后,我使用SQLAlchemy fetchmany()方法在无限while循环中遍历结果:

while 'batch not empty':  # equivalent of 'while True', but clearer
    batch = proxy.fetchmany(100000)  # 100,000 rows at a time

    if not batch:
        break

    for row in batch:
        # Do your stuff here...

proxy.close()
Run Code Online (Sandbox Code Playgroud)

这种方法使我可以进行所有类型的数据聚合,而没有任何危险的内存开销。

NOTE stream_results与Postgres的和工作pyscopg2适配器,但我想这不会有任何DBAPI工作,也没有与任何数据库驱动程序...

这篇博客文章中有一个有趣的用例,启发了我的上述方法。

  • 如果有人正在使用 postgres 或 mysql(使用`pymysql`),恕我直言,这应该是公认的答案。 (2认同)
  • 救了我的命,看到我的查询运行得越来越慢。我已经在 pyodbc 上检测了上面的内容(从 sql server 到 postgres),它运行得像梦一样。 (2认同)
  • 这对我来说是最好的方法。由于我使用 ORM,我需要将 SQL 编译为我的方言 (Postgres),然后直接从连接(而不是从会话)执行,如上所示。我在另一个问题中找到的编译“如何”/sf/ask/323210401/。速度的提高很大。从 JOINS 更改为 SUBQUERIES 也大大提高了性能。还建议使用 sqlalchemy_mixins,使用 smart_query 对构建最有效的查询有很大帮助。https://github.com/absent1706/sqlalchemy-mixins (2认同)

Pie*_*ton 7

本着Joel的回答,我使用以下内容:

WINDOW_SIZE = 1000
def qgen(query):
    start = 0
    while True:
        stop = start + WINDOW_SIZE
        things = query.slice(start, stop).all()
        if things is None:
            break
        for thing in things:
            yield(thing)
        start += WINDOW_SIZE
Run Code Online (Sandbox Code Playgroud)