以高效的内存方式将大型csv读入稀疏的pandas数据帧

Joh*_*ler 24 python numpy scipy pandas dask

pandas read_csv函数似乎没有稀疏选项.我有csv数据,其中有大量零(它压缩得非常好,并且剥离任何0值都会将其减少到几乎原始大小的一半).

我已经尝试将它加载到密集矩阵中read_csv然后调用to_sparse,但是它需要很长时间并且在文本字段上窒息,尽管大多数数据都是浮点数.如果我pandas.get_dummies(df)先调用将分类列转换为1和0,那么调用to_sparse(fill_value=0)它需要花费大量时间,比我预期的大得多的数字表有更长的时间,大多数为零.即使我从原始文件中删除零并调用to_sparse()(以使填充值为NaN),也会发生这种情况.这也恰好不管我是否通过kind='block'kind='integer'.

除了手工构建稀疏数据帧之外,是否有一种好的,平滑的方式可以直接加载稀疏的csv而不会占用大量不必要的内存?


下面是一些代码,用于创建具有3列浮点数据和一列文本数据的样本数据集.大约85%的浮点值为零,CSV的总大小约为300 MB,但您可能希望将其放大以真正测试内存约束.

np.random.seed(123)
df=pd.DataFrame( np.random.randn(10000000,3) , columns=list('xyz') )
df[ df < 1.0 ] = 0.0
df['txt'] = np.random.choice( list('abcdefghij'), size=len(df) )
df.to_csv('test.csv',index=False)
Run Code Online (Sandbox Code Playgroud)

这是一种简单的阅读方式,但希望有更好,更有效的方法:

sdf = pd.read_csv( 'test.csv', dtype={'txt':'category'} ).to_sparse(fill_value=0.0)
Run Code Online (Sandbox Code Playgroud)

编辑添加(来自JohnE): 如果可能,请在答案中提供有关读取大型CSV的一些相对性能统计数据,包括有关如何测量内存效率的信息(特别是因为内存效率比时钟时间更难测量).特别要注意的是,如果内存效率更高,那么较慢的(时钟时间)答案可能是最佳答案.

jak*_*vdp 16

我可能会通过使用dask以流式方式加载数据来解决这个问题.例如,您可以按如下方式创建一个dask数据帧:

import dask.dataframe as ddf
data = ddf.read_csv('test.csv')
Run Code Online (Sandbox Code Playgroud)

此时此data对象实际上没有做任何事情; 它只包含一个"配方",可以在可管理的块中从磁盘读取数据帧.如果要实现数据,可以调用compute():

df = data.compute().reset_index(drop=True)
Run Code Online (Sandbox Code Playgroud)

此时,您有一个标准的pandas数据帧(我们调用reset_index因为默认情况下每个分区都是独立索引的).结果等同于您通过pd.read_csv直接调用获得的结果:

df.equals(pd.read_csv('test.csv'))
# True
Run Code Online (Sandbox Code Playgroud)

dask的好处是你可以为这个"配方"添加指令来构建你的数据帧; 例如,您可以按如下方式使数据的每个分区稀疏:

data = data.map_partitions(lambda part: part.to_sparse(fill_value=0))
Run Code Online (Sandbox Code Playgroud)

此时,调用compute()将构造一个稀疏数组:

df = data.compute().reset_index(drop=True)
type(df)
# pandas.core.sparse.frame.SparseDataFrame
Run Code Online (Sandbox Code Playgroud)

剖析

为了检查dask方法与原始pandas方法的比较,让我们进行一些线性分析.我将使用lprunmprun,作为描述在这里(全面披露:这是我自己的书的部分).

假设您正在使用Jupyter笔记本,您可以这样运行:

首先,使用我们想要执行的基本任务创建一个单独的文件:

%%file dask_load.py

import numpy as np
import pandas as pd
import dask.dataframe as ddf

def compare_loads():
    df = pd.read_csv('test.csv')
    df_sparse = df.to_sparse(fill_value=0)

    df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    df_dask = df_dask.compute().reset_index(drop=True)
Run Code Online (Sandbox Code Playgroud)

接下来让我们对计算时间进行逐行分析:

%load_ext line_profiler

from dask_load import compare_loads
%lprun -f compare_loads compare_loads()
Run Code Online (Sandbox Code Playgroud)

我得到以下结果:

Timer unit: 1e-06 s

Total time: 13.9061 s
File: /Users/jakevdp/dask_load.py
Function: compare_loads at line 6

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     6                                           def compare_loads():
     7         1      4746788 4746788.0     34.1      df = pd.read_csv('test.csv')
     8         1       769303 769303.0      5.5      df_sparse = df.to_sparse(fill_value=0)
     9                                           
    10         1        33992  33992.0      0.2      df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    11         1         7848   7848.0      0.1      df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    12         1      8348217 8348217.0     60.0      df_dask = df_dask.compute().reset_index(drop=True)
Run Code Online (Sandbox Code Playgroud)

我们看到大约60%的时间花在了dask调用上,而大约40%的时间花在了上面的示例数组的pandas调用中.这告诉我们dask比这个任务的pandas慢约50%:这是可以预料到的,因为数据分区的分块和重组会导致一些额外的开销.

dask闪耀在内存使用中:让我们mprun用来做逐行内存配置文件:

%load_ext memory_profiler
%mprun -f compare_loads compare_loads()
Run Code Online (Sandbox Code Playgroud)

我机器上的结果如下:

Filename: /Users/jakevdp/dask_load.py

Line #    Mem usage    Increment   Line Contents
================================================
     6     70.9 MiB     70.9 MiB   def compare_loads():
     7    691.5 MiB    620.6 MiB       df = pd.read_csv('test.csv')
     8    828.8 MiB    137.3 MiB       df_sparse = df.to_sparse(fill_value=0)
     9                             
    10    806.3 MiB    -22.5 MiB       df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    11    806.4 MiB      0.1 MiB       df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    12    947.9 MiB    141.5 MiB       df_dask = df_dask.compute().reset_index(drop=True)
Run Code Online (Sandbox Code Playgroud)

我们看到最终的pandas数据帧大小约为140MB,但是熊猫一路上使用大约620MB,因为它将数据读入临时密集对象.

另一方面,dask在加载数组和构造最终稀疏结果时仅使用~140MB.如果您正在读取其密集大小与系统中可用内存相当的数据,则dask具有明显的优势,尽管计算时间缩短了约50%.


但是对于处理大数据,你不应该停在这里.据推测,您正在对数据进行一些操作,并且dask数据帧抽象允许您在实现数据之前执行这些操作(即将它们添加到"配方").因此,如果您对数据所做的事情涉及算术,聚合,分组等,您甚至不需要担心稀疏存储:只需使用dask对象执行这些操作,最后调用compute(),并且dask将采用注意以记忆效率的方式应用它们.

因此,例如,我可以max()使用dask数据帧计算每列的内容,而无需立即将整个内容加载到内存中:

>>> data.max().compute()
x      5.38114
y      5.33796
z      5.25661
txt          j
dtype: object
Run Code Online (Sandbox Code Playgroud)

直接使用dask数据帧可以避免对数据表示的担忧,因为您可能永远不必一次将所有数据加载到内存中.

祝你好运!


Joh*_*hnE 9

这是一个主要作为基准提供的答案.希望有比这更好的方法.

chunksize = 1000000       # perhaps try some different values here?
chunks = pd.read_csv( 'test.csv', chunksize=chunksize, dtype={'txt':'category'} )
sdf = pd.concat( [ chunk.to_sparse(fill_value=0.0) for chunk in chunks ] )
Run Code Online (Sandbox Code Playgroud)

正如@acushner所说,你可以改为将其作为生成器表达式:

sdf = pd.concat( chunk.to_sparse(fill_value=0.0) for chunk in chunks )
Run Code Online (Sandbox Code Playgroud)

虽然在我的测试中我没有看到任何大的差异,但也许您可能使用不同的数据,似乎已经达成共识,这比列表补偿方式更好.

我希望报告一些关于各种方法的内存分析,但我很难得到一致的结果,我怀疑因为python总是在后台清理内存,导致一些随机噪声被添加到结果中.(在对Jake的回答的评论中,他建议在每个之前重新启动jupyter内核%memit以获得更一致的结果,但我还没有尝试过.)

但我一直发现(使用%%memit)上面的分块和@jakevdp的dask方法都使用了一些非常粗略的内存附近的东西作为OP中的天真方法.有关分析的更多信息,请查看Jake的书"Python数据科学手册"中的"分析和时序代码".