可以dask parralelize读取一个csv文件?

Mag*_*n88 13 python csv pandas dask

我正在将大文本文件转换为hdf存储,希望能够更快地访问数据.转换工作正常,但是从csv文件读取并不是并行完成的.它真的很慢(对于SSD上的1GB文本文件需要大约30分钟,所以我的猜测是它不受IO限制).

有没有办法让它在parralel的多个线程中读取?Sice可能很重要,我目前被迫在Windows下运行 - 以防万一有所不同.

from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df.categorize([ 'Type',
                'Condition',               
          ])

df.to_hdf("data/data.hdf", "Measurements", 'w')
Run Code Online (Sandbox Code Playgroud)

mgo*_*ser 13

借助@MRocklin 的回答,在较新版本的 dask 中,您可以使用df.compute(scheduler='processes')df.compute(scheduler='threads')使用多处理或多线程转换为熊猫:

from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df = df.compute(scheduler='processes')     # convert to pandas

df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')

df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')
Run Code Online (Sandbox Code Playgroud)

  • 嗨@edesz - 线程共享内存,并受制于 GIL(全局解释器锁),而进程作为单独的进程运行并具有额外的开销。通常,由于 GIL 的限制,多线程在 Python 中不能很好地工作,除非任务是 IO 绑定的(例如,如果每个任务都在下载文件)。如果您不确定,请尝试两者,看看哪个更快。 (3认同)

MRo*_*lin 11

是的,dask.dataframe可以并行读取.但是你遇到了两个问题:

Pandas.read_csv仅部分释放GIL

默认情况下,dask.dataframe与线程并行化,因为大多数Pandas可以在多个线程中并行运行(释放GIL).Pandas.read_csv是一个例外,特别是如果您生成的数据帧使用对象dtypes作为文本

dask.dataframe.to_hdf(filename)强制进行顺序计算

写入单个HDF文件将强制执行顺序计算(并行写入单个文件非常困难.)

编辑:新解决方案

今天我会避免使用HDF并使用Parquet.我可能会使用多处理或dask.distributed调度程序来避免单个机器上的GIL问题.这两者的组合应该为您提供完整的线性缩放.

from dask.distributed import Client
client = Client()

df = dask.dataframe.read_csv(...)
df.to_parquet(...)
Run Code Online (Sandbox Code Playgroud)

因为您的数据集可能适合内存,所以使用dask.dataframe.read_csv与多个进程并行加载,然后立即切换到Pandas.

import dask.dataframe as ddf
import dask.multiprocessing

df = ddf.read_csv("data/Measurements*.csv",  # read in parallel
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df = df.compute(get=dask.multiprocessing.get)     # convert to pandas

df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')

df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')
Run Code Online (Sandbox Code Playgroud)

  • 你有可能以两种方式扩展这个答案吗?首先,我的.csv不适合内存.第二个也许更复杂的是.csv文件是压缩的,Dask目前不支持.这个讨论中指针(https://github.com/dask/dask/issues/2554)到`dask.delayed`但是我不确定如何将它与`pd.read_csv`和`chunksize`结合使用.谢谢! (4认同)