将大量数据从远程服务器拉入DataFrame

Mar*_*tuc 8 python postgresql psycopg2 pandas

为了尽可能多地提供上下文,我试图将存储在远程postgres服务器(heroku)上的一些数据放入pandas DataFrame中,使用psycopg2进行连接.

我对两个特定的表,用户事件感兴趣,并且连接工作正常,因为在下拉用户数据时

import pandas.io.sql as sql 
# [...]
users = sql.read_sql("SELECT * FROM users", conn)
Run Code Online (Sandbox Code Playgroud)

等待几秒钟后,DataFrame按预期返回.

<class 'pandas.core.frame.DataFrame'>
Int64Index: 67458 entries, 0 to 67457
Data columns (total 35 columns): [...]
Run Code Online (Sandbox Code Playgroud)

然而,当试图直接从ipython中提取更大,更重的事件数据时,经过很长一段时间,它只会崩溃:

In [11]: events = sql.read_sql("SELECT * FROM events", conn)
vagrant@data-science-toolbox:~$
Run Code Online (Sandbox Code Playgroud)

当从iPython笔记本尝试时,我得到死核心错误

内核已经死了,你想重新启动吗?如果不重新启动内核,则可以保存笔记本,但在重新打开笔记本之前,运行代码将无法运行.


更新#1:

为了更好地了解我想要引入的事件表的大小,这里是记录的数量和每个的属性数量:

In [11]: sql.read_sql("SELECT count(*) FROM events", conn)
Out[11]:
     count
0  2711453

In [12]: len(sql.read_sql("SELECT * FROM events LIMIT 1", conn).columns)
Out[12]: 18
Run Code Online (Sandbox Code Playgroud)

更新#2:

内存绝对是当前实现的瓶颈read_sql:当拉下事件并尝试运行另一个iPython实例时,结果是

vagrant@data-science-toolbox:~$ sudo ipython
-bash: fork: Cannot allocate memory
Run Code Online (Sandbox Code Playgroud)

更新#3:

我首先尝试了一个read_sql_chunked只返回部分DataFrames数组的实现:

def read_sql_chunked(query, conn, nrows, chunksize=1000):
    start = 0
    dfs = []
    while start < nrows:
        df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), conn)
        start += chunksize
        dfs.append(df)
        print "Events added: %s to %s of %s" % (start-chunksize, start, nrows)
    # print "concatenating dfs"
    return dfs

event_dfs = read_sql_chunked("SELECT * FROM events", conn, events_count, 100000)
Run Code Online (Sandbox Code Playgroud)

并且运行良好,但在尝试连接DataFrame时,内核会再次死亡.
这是在给VM 2GB的RAM之后.

基于Andy read_sqlread_csv实现和性能差异的解释,接下来我尝试将记录附加到CSV中,然后将它们全部读入DataFrame:

event_dfs[0].to_csv(path+'new_events.csv', encoding='utf-8')

for df in event_dfs[1:]:
    df.to_csv(path+'new_events.csv', mode='a', header=False, encoding='utf-8')
Run Code Online (Sandbox Code Playgroud)

同样,写入CSV成功完成 - 一个657MB的文件 - 但从CSV读取永远不会完成.

如何估计一个657MB的CSV文件就足以读取多少RAM,因为2GB看起来还不够?


感觉就像我缺少对DataFrames或psycopg2的一些基本理解,但是我被卡住了,我甚至无法确定瓶颈或优化的位置.

从远程(postgres)服务器提取大量数据的正确策略是什么?

And*_*den 5

我怀疑这里有一些(相关的)因素导致了速度缓慢:

  1. read_sql是用 python 编写的,所以有点慢(特别是与read_csv用 cython 编写的 相比,并且为了速度而精心实现!)并且它依赖于 sqlalchemy 而不是一些(可能更快)C-DBAPI。迁移到 sqlalchmey 的动力是为了使将来的迁移变得更容易(以及跨 SQL 平台支持)。
  2. 您可能会耗尽内存,因为内存中有太多 python 对象(这与不使用 C-DBAPI 有关),但可能可以解决...

我认为直接的解决方案是基于块的方法(并且有一个功能请求要求read_sql在 pandas和中本地工作read_sql_table)。

编辑:从 Pandas v0.16.2 开始,这种基于块的方法是在read_sql.


由于您使用的是 postgres,因此您可以访问LIMIT 和 OFFSET 查询,这使得分块变得非常容易。(我的想法是否正确,这些并非在所有 sql 语言中都可用?)

首先,获取表中的行数(或估计值):

nrows = con.execute('SELECT count(*) FROM users').fetchone()[0]  # also works with an sqlalchemy engine
Run Code Online (Sandbox Code Playgroud)

使用它来迭代表(为了调试,您可以添加一些打印语句来确认它正在工作/没有崩溃!),然后合并结果:

def read_sql_chunked(query, con, nrows, chunksize=1000):
    start = 1
    dfs = []  # Note: could probably make this neater with a generator/for loop
    while start < nrows:
        df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), con)
        dfs.append(df)
    return pd.concat(dfs, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)

注意:这假设数据库适合内存!如果没有,您将需要处理每个块(mapreduce 样式)...或投资更多内存!