Mar*_*tuc 8 python postgresql psycopg2 pandas
为了尽可能多地提供上下文,我试图将存储在远程postgres服务器(heroku)上的一些数据放入pandas DataFrame中,使用psycopg2进行连接.
我对两个特定的表,用户和事件感兴趣,并且连接工作正常,因为在下拉用户数据时
import pandas.io.sql as sql
# [...]
users = sql.read_sql("SELECT * FROM users", conn)
Run Code Online (Sandbox Code Playgroud)
等待几秒钟后,DataFrame按预期返回.
<class 'pandas.core.frame.DataFrame'>
Int64Index: 67458 entries, 0 to 67457
Data columns (total 35 columns): [...]
Run Code Online (Sandbox Code Playgroud)
然而,当试图直接从ipython中提取更大,更重的事件数据时,经过很长一段时间,它只会崩溃:
In [11]: events = sql.read_sql("SELECT * FROM events", conn)
vagrant@data-science-toolbox:~$
Run Code Online (Sandbox Code Playgroud)
当从iPython笔记本尝试时,我得到死核心错误
内核已经死了,你想重新启动吗?如果不重新启动内核,则可以保存笔记本,但在重新打开笔记本之前,运行代码将无法运行.
更新#1:
为了更好地了解我想要引入的事件表的大小,这里是记录的数量和每个的属性数量:
In [11]: sql.read_sql("SELECT count(*) FROM events", conn)
Out[11]:
count
0 2711453
In [12]: len(sql.read_sql("SELECT * FROM events LIMIT 1", conn).columns)
Out[12]: 18
Run Code Online (Sandbox Code Playgroud)
更新#2:
内存绝对是当前实现的瓶颈read_sql:当拉下事件并尝试运行另一个iPython实例时,结果是
vagrant@data-science-toolbox:~$ sudo ipython
-bash: fork: Cannot allocate memory
Run Code Online (Sandbox Code Playgroud)
更新#3:
我首先尝试了一个read_sql_chunked只返回部分DataFrames数组的实现:
def read_sql_chunked(query, conn, nrows, chunksize=1000):
start = 0
dfs = []
while start < nrows:
df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), conn)
start += chunksize
dfs.append(df)
print "Events added: %s to %s of %s" % (start-chunksize, start, nrows)
# print "concatenating dfs"
return dfs
event_dfs = read_sql_chunked("SELECT * FROM events", conn, events_count, 100000)
Run Code Online (Sandbox Code Playgroud)
并且运行良好,但在尝试连接DataFrame时,内核会再次死亡.
这是在给VM 2GB的RAM之后.
基于Andy read_sql对read_csv实现和性能差异的解释,接下来我尝试将记录附加到CSV中,然后将它们全部读入DataFrame:
event_dfs[0].to_csv(path+'new_events.csv', encoding='utf-8')
for df in event_dfs[1:]:
df.to_csv(path+'new_events.csv', mode='a', header=False, encoding='utf-8')
Run Code Online (Sandbox Code Playgroud)
同样,写入CSV成功完成 - 一个657MB的文件 - 但从CSV读取永远不会完成.
如何估计一个657MB的CSV文件就足以读取多少RAM,因为2GB看起来还不够?
感觉就像我缺少对DataFrames或psycopg2的一些基本理解,但是我被卡住了,我甚至无法确定瓶颈或优化的位置.
从远程(postgres)服务器提取大量数据的正确策略是什么?
我怀疑这里有一些(相关的)因素导致了速度缓慢:
read_sql是用 python 编写的,所以有点慢(特别是与read_csv用 cython 编写的 相比,并且为了速度而精心实现!)并且它依赖于 sqlalchemy 而不是一些(可能更快)C-DBAPI。迁移到 sqlalchmey 的动力是为了使将来的迁移变得更容易(以及跨 SQL 平台支持)。我认为直接的解决方案是基于块的方法(并且有一个功能请求要求read_sql在 pandas和中本地工作read_sql_table)。
编辑:从 Pandas v0.16.2 开始,这种基于块的方法是在read_sql.
由于您使用的是 postgres,因此您可以访问LIMIT 和 OFFSET 查询,这使得分块变得非常容易。(我的想法是否正确,这些并非在所有 sql 语言中都可用?)
首先,获取表中的行数(或估计值):
nrows = con.execute('SELECT count(*) FROM users').fetchone()[0] # also works with an sqlalchemy engine
Run Code Online (Sandbox Code Playgroud)
使用它来迭代表(为了调试,您可以添加一些打印语句来确认它正在工作/没有崩溃!),然后合并结果:
def read_sql_chunked(query, con, nrows, chunksize=1000):
start = 1
dfs = [] # Note: could probably make this neater with a generator/for loop
while start < nrows:
df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), con)
dfs.append(df)
return pd.concat(dfs, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)
注意:这假设数据库适合内存!如果没有,您将需要处理每个块(mapreduce 样式)...或投资更多内存!