如何提高python读取多个csv文件的速度

Mar*_*nto 1 python csv dataframe pandas dask

这是我第一次创建处理包含大量数据的文件的代码,所以我有点卡在这里。

\n

我想要做的是读取路径列表,列出所有需要读取的 csv 文件,从每个文件中检索 HEAD 和 TAIL 并将其放入列表中。

\n

我总共有 621 个 csv 文件,每个文件由 5800 行和 251 列组成\n在此输入图像描述\n这是数据样本

\n
[LOGGING],RD81DL96_1,3,4,5,2,,,,\nLOG01,,,,,,,,,\nDATETIME,INDEX,SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0]\nTIME,INDEX,FF-1(\xef\xbc\x91A) ,FF-1(\xef\xbc\x91B) ,FF-1(\xef\xbc\x91C) ,FF-1(\xef\xbc\x92A),FF-2(\xef\xbc\x91A) ,FF-2(\xef\xbc\x91B) ,FF-2(\xef\xbc\x91C),FF-2(\xef\xbc\x92A)\n47:29.6,1,172,0,139,1258,0,0,400,0\n47:34.6,2,172,0,139,1258,0,0,400,0\n47:39.6,3,172,0,139,1258,0,0,400,0\n47:44.6,4,172,0,139,1263,0,0,400,0\n47:49.6,5,172,0,139,1263,0,0,450,0\n47:54.6,6,172,0,139,1263,0,0,450,0\n
Run Code Online (Sandbox Code Playgroud)\n

问题是,虽然读取所有文件花了大约 13 秒(老实说还是有点慢)

\n

但是当我添加一行追加代码时,这个过程花了很多时间才能完成,大约需要 4 分钟。

\n

以下是代码片段:

\n
# CsvList: [File Path, Change Date, File size, File Name]\nfor x, file in enumerate(CsvList):\n     timeColumn = [\'TIME\']\n     df = dd.read_csv(file[0], sep =\',\', skiprows = 3, encoding= \'CP932\', engine=\'python\', usecols=timeColumn)\n\n     # The process became long when this code is added\n     startEndList.append(list(df.head(1)) + list(df.tail(1))) \n
Run Code Online (Sandbox Code Playgroud)\n

为什么会这样?我正在使用 dask.dataframe

\n

pav*_*aes 5

目前,您的代码并未真正利用 Dask 的并行化功能,因为:

  • df.head调用df.tail将触发“计算”(即,将 Dask DataFrame 转换为 pandas DataFrame——这是我们在 Dask 的惰性计算中尝试最小化的),并且
  • for 循环按顺序运行,因为您正在创建 Dask DataFrame 并将它们转换为 pandas DataFrame,所有这些都在循环内。

因此,您当前的示例类似于在 for 循环中使用 pandas,但增加了 Dask 到 pandas 的转换开销。

由于您需要处理每个文件,因此我建议您查看Dask Delayed,它在这里可能更优雅+有用。以下(伪代码)将并行化每个文件上的 pandas 操作:

import dask
import pandas as pd

for file in list_of_files:
    df = dask.delayed(pd.read_csv)(file)
    result.append(df.head(1) + df.tail(1))

dask.compute(*result)
Run Code Online (Sandbox Code Playgroud)

dask.visualize(*result)当我使用 4 个 csv 文件时的输出确认了并行性:

在此输入图像描述

如果你真的想在这里使用 Dask DataFrame,你可以尝试:

  • 将所有文件读入单个 Dask DataFrame,
  • 确保每个 Dask“分区”对应一个文件,
  • 使用 Dask Dataframeapply获取headtail值并将它们附加到新列表中
  • 在新列表上调用计算