Pau*_*aul 74 python mysql sqlalchemy
我有一个~10M记录的MySQL表,我使用SqlAlchemy进行交互.我发现对这个表的大型子集的查询将占用太多内存,即使我认为我使用的是内置生成器,智能地获取数据集的一口大小的块:
for thing in session.query(Things):
analyze(thing)
Run Code Online (Sandbox Code Playgroud)
为了避免这种情况,我发现我必须构建自己的迭代器,这些迭代器会以块的形式出现:
lastThingID = None
while True:
things = query.filter(Thing.id < lastThingID).limit(querySize).all()
if not rows or len(rows) == 0:
break
for thing in things:
lastThingID = row.id
analyze(thing)
Run Code Online (Sandbox Code Playgroud)
这是正常的还是有关于SA内置发电机我缺少的东西?
这个问题的答案似乎表明内存消耗是不可预期的.
zzz*_*eek 108
大多数DBAPI实现在获取行时完全缓冲行 - 通常,在SQLAlchemy ORM甚至获得一个结果之前,整个结果集都在内存中.
但是,Query的工作方式是它在返回给你的对象之前默认完全加载给定的结果集.这里的基本原理认为查询不仅仅是简单的SELECT语句 - 连接到其他表,这些表可能在一个结果集中多次返回相同的对象标识(与急切加载相同),整个行集需要在内存中,所以可以返回正确的结果 - 否则集合等可能只是部分填充.
因此Query提供了一个更改此行为的选项,即yield_per()调用http://www.sqlalchemy.org/docs/orm/query.html?highlight=yield_per#sqlalchemy.orm.query.Query.yield_per.此调用将导致Query批量生成行,您可以在其中为批量指定大小.正如文档所述,这只适用于你没有做任何急切的集合加载 - 所以基本上如果你真的知道你在做什么.而且,如果底层DBAPI预先缓冲行,那么仍然会有内存开销,因此该方法仅比不使用它更好地扩展.
我几乎没有使用yield_per() - 相反,我使用上面使用窗口函数建议的LIMIT方法的更好版本.LIMIT和OFFSET有一个巨大的问题,即非常大的OFFSET值会导致查询变得越来越慢,因为N的OFFSET会导致它遍历N行 - 就像执行相同的查询五十次而不是一次,每次读取一次行数越来越多.使用窗口函数方法,我预取一组"窗口"值,这些值引用我想要选择的表的块.然后我发出单独的SELECT语句,每个语句一次从其中一个窗口拉出.
窗口函数方法在维基上http://www.sqlalchemy.org/trac/wiki/UsageRecipes/WindowedRangeQuery,我使用它非常成功.
另请注意,并非所有数据库都支持窗口功能 - 您需要PG,Oracle或SQL Server.恕我直言至少使用Postgresql绝对是值得的 - 如果你使用关系数据库,你也可以使用最好的.
Joe*_*oel 13
我一直在研究SQLAlchemy的高效遍历/分页,并希望更新这个答案.
我认为你可以使用切片调用来正确地限制查询的范围,你可以有效地重用它.
例:
window_size = 10 # or whatever limit you like
window_idx = 0
while True:
start,stop = window_size*window_idx, window_size*(window_idx+1)
things = query.slice(start, stop).all()
if things is None:
break
for thing in things:
analyze(thing)
if len(things) < window_size:
break
window_idx += 1
Run Code Online (Sandbox Code Playgroud)
edo*_*ron 13
我不是数据库专家,但是当使用SQLAlchemy作为简单的Python抽象层(即,不使用ORM查询对象)时,我想出了一个令人满意的解决方案,可以查询300M行表而不会增加内存使用量...
这是一个虚拟的示例:
from sqlalchemy import create_engine, select
conn = create_engine("DB URL...").connect()
q = select([huge_table])
proxy = conn.execution_options(stream_results=True).execute(q)
Run Code Online (Sandbox Code Playgroud)
然后,我使用SQLAlchemy fetchmany()
方法在无限while
循环中遍历结果:
while 'batch not empty': # equivalent of 'while True', but clearer
batch = proxy.fetchmany(100000) # 100,000 rows at a time
if not batch:
break
for row in batch:
# Do your stuff here...
proxy.close()
Run Code Online (Sandbox Code Playgroud)
这种方法使我可以进行所有类型的数据聚合,而没有任何危险的内存开销。
NOTE
在stream_results
与Postgres的和工作pyscopg2
适配器,但我想这不会有任何DBAPI工作,也没有与任何数据库驱动程序...
这篇博客文章中有一个有趣的用例,启发了我的上述方法。
本着Joel的回答,我使用以下内容:
WINDOW_SIZE = 1000
def qgen(query):
start = 0
while True:
stop = start + WINDOW_SIZE
things = query.slice(start, stop).all()
if things is None:
break
for thing in things:
yield(thing)
start += WINDOW_SIZE
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
44659 次 |
最近记录: |