使用 Dask 高效地按部分读取大 csv 文件

Mik*_*Sam 5 python csv dask dask-dataframe

现在,我正在使用 Dask 读取大型 csv 文件,并对其进行一些后处理(例如,进行一些数学运算,然后通过某些 ML 模型进行预测并将结果写入数据库)。避免将所有数据加载到内存中,我想按当前大小的块读取:读取第一个块,预测,写入,读取第二个块等。

我使用skiprowsand尝试了下一个解决方案nrows

import dask.dataframe as dd
read_path = "medium.csv"

# Read by chunk
skiprows = 100000
nrows = 50000
res_df = dd.read_csv(read_path, skiprows=skiprows)
res_df = res_df.head(nrows)

print(res_df.shape)
print(res_df.head())
Run Code Online (Sandbox Code Playgroud)

但我收到错误:

ValueError:样本不够大,无法包含至少一行数据。sample请增加调用中的 字节数read_csv/read_table

另外,据我了解,它每次都会为所有数据计算二进制掩码([False,False,...,True,...])以查找要加载的行。我们怎样才能更有效地做到这一点?也许使用 dask 中的一些分布式或延迟函数?

MRo*_*lin 4

Dask dataframe 会帮你分区数据,你不需要使用nrows/skip_rows

df = dd.read_csv(filename)
Run Code Online (Sandbox Code Playgroud)

如果你想选择一个特定的分区,那么你可以使用分区访问器

part = df.partitions[i]
Run Code Online (Sandbox Code Playgroud)

但是,您可能还想并行应用您的函数。

df.map_partitions(process).to_csv("data.*.csv")
Run Code Online (Sandbox Code Playgroud)