Gab*_*nte 15 python dataframe pandas dask
对于我的应用程序,我需要读取每个15 M行的多个文件,将它们存储在DataFrame中,然后以HDFS5格式保存DataFrame。
我已经尝试了不同的方法,特别是punks.read_csv(具有chunksize和dtype规范)和dask.dataframe。它们都需要大约90秒才能处理1个文件,因此我想知道是否有一种方法可以按所述方式有效处理这些文件。在下面的代码中,我展示了一些我已经完成的测试代码。
import pandas as pd
import dask.dataframe as dd
import numpy as np
import re
# First approach
store = pd.HDFStore('files_DFs.h5')
chunk_size = 1e6
df_chunk = pd.read_csv(file,
sep="\t",
chunksize=chunk_size,
usecols=['a', 'b'],
converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
"b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
skiprows=15
)
chunk_list = []
for chunk in df_chunk:
chunk_list.append(chunk)
df = pd.concat(chunk_list, ignore_index=True)
store[dfname] = df
store.close()
# Second approach
df = dd.read_csv(
file,
sep="\t",
usecols=['a', 'b'],
converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
"b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
skiprows=15
)
store.put(dfname, df.compute())
store.close()
Run Code Online (Sandbox Code Playgroud)
这是文件的外观(空白由文字标签组成):
a b
599.998413 14.142895
599.998413 20.105534
599.998413 6.553850
599.998474 27.116098
599.998474 13.060312
599.998474 13.766775
599.998596 1.826706
599.998596 18.275938
599.998718 20.797491
599.998718 6.132450)
599.998718 41.646194
599.998779 19.145775
Run Code Online (Sandbox Code Playgroud)
首先,让我们回答问题的标题
我建议您使用modin:
import modin.pandas as mpd
import pandas as pd
import numpy as np
frame_data = np.random.randint(0, 10_000_000, size=(15_000_000, 2))
pd.DataFrame(frame_data*0.0001).to_csv('15mil.csv', header=False)
Run Code Online (Sandbox Code Playgroud)
!wc 15mil*.csv ; du -h 15mil*.csv
15000000 15000000 480696661 15mil.csv
459M 15mil.csv
Run Code Online (Sandbox Code Playgroud)
%%timeit -r 3 -n 1 -t
global df1
df1 = pd.read_csv('15mil.csv', header=None)
9.7 s ± 95.1 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)
%%timeit -r 3 -n 1 -t
global df2
df2 = mpd.read_csv('15mil.csv', header=None)
3.07 s ± 685 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)
(df2.values == df1.values).all()
True
Run Code Online (Sandbox Code Playgroud)
因此,正如我们所见,modin 在我的设置上快了大约3倍。
现在回答您的特定问题
正如人们已经指出的那样,您的瓶颈可能是转换器。您称这些lambda为3000万次。在这种规模下,甚至函数调用的开销也变得微不足道。
让我们来解决这个问题。
!sed 's/.\{4\}/&)/g' 15mil.csv > 15mil_dirty.csv
Run Code Online (Sandbox Code Playgroud)
首先,我尝试将modin与converters参数一起使用。然后,我尝试了另一种调用regexp的方法:
首先,我将创建一个类似于File的对象,该对象通过您的regexp过滤所有内容:
%%timeit -r 3 -n 1 -t
global df2
df2 = mpd.read_csv('15mil.csv', header=None)
3.07 s ± 685 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)
然后,将其作为read_csv中的第一个参数传递给pandas:
with open('15mil_dirty.csv') as file:
df2 = pd.read_csv(FilterFile(file))
Run Code Online (Sandbox Code Playgroud)
(df2.values == df1.values).all()
True
Run Code Online (Sandbox Code Playgroud)
!sed 's/.\{4\}/&)/g' 15mil.csv > 15mil_dirty.csv
Run Code Online (Sandbox Code Playgroud)
class FilterFile():
def __init__(self, file):
self.file = file
def read(self, n):
return re.sub(r"[^\d.,\n]", "", self.file.read(n))
def write(self, *a): return self.file.write(*a) # needed to trick pandas
def __iter__(self, *a): return self.file.__iter__(*a) # needed
Run Code Online (Sandbox Code Playgroud)
好像莫丁又赢了!不幸的是,modin尚未实现从缓冲区读取的功能,因此我设计了最终方法。
with open('15mil_dirty.csv') as file:
df2 = pd.read_csv(FilterFile(file))
Run Code Online (Sandbox Code Playgroud)
这使用的translate速度比快得多re.sub,也使用的/dev/shm是ubuntu(和其他linux)通常提供的内存文件系统。写入那里的任何文件都永远不会进入磁盘,因此速度很快。最后,它使用modin读取文件,以解决modin的缓冲区限制。这种方法比您的方法快30倍左右,而且非常简单。