Mar*_*nto 1 python csv dataframe pandas dask
这是我第一次创建处理包含大量数据的文件的代码,所以我有点卡在这里。
\n我想要做的是读取路径列表,列出所有需要读取的 csv 文件,从每个文件中检索 HEAD 和 TAIL 并将其放入列表中。
\n我总共有 621 个 csv 文件,每个文件由 5800 行和 251 列组成\n
\n这是数据样本
[LOGGING],RD81DL96_1,3,4,5,2,,,,\nLOG01,,,,,,,,,\nDATETIME,INDEX,SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0]\nTIME,INDEX,FF-1(\xef\xbc\x91A) ,FF-1(\xef\xbc\x91B) ,FF-1(\xef\xbc\x91C) ,FF-1(\xef\xbc\x92A),FF-2(\xef\xbc\x91A) ,FF-2(\xef\xbc\x91B) ,FF-2(\xef\xbc\x91C),FF-2(\xef\xbc\x92A)\n47:29.6,1,172,0,139,1258,0,0,400,0\n47:34.6,2,172,0,139,1258,0,0,400,0\n47:39.6,3,172,0,139,1258,0,0,400,0\n47:44.6,4,172,0,139,1263,0,0,400,0\n47:49.6,5,172,0,139,1263,0,0,450,0\n47:54.6,6,172,0,139,1263,0,0,450,0\nRun Code Online (Sandbox Code Playgroud)\n问题是,虽然读取所有文件花了大约 13 秒(老实说还是有点慢)
\n但是当我添加一行追加代码时,这个过程花了很多时间才能完成,大约需要 4 分钟。
\n以下是代码片段:
\n# CsvList: [File Path, Change Date, File size, File Name]\nfor x, file in enumerate(CsvList):\n timeColumn = [\'TIME\']\n df = dd.read_csv(file[0], sep =\',\', skiprows = 3, encoding= \'CP932\', engine=\'python\', usecols=timeColumn)\n\n # The process became long when this code is added\n startEndList.append(list(df.head(1)) + list(df.tail(1))) \nRun Code Online (Sandbox Code Playgroud)\n为什么会这样?我正在使用 dask.dataframe
\n目前,您的代码并未真正利用 Dask 的并行化功能,因为:
df.head调用df.tail将触发“计算”(即,将 Dask DataFrame 转换为 pandas DataFrame——这是我们在 Dask 的惰性计算中尝试最小化的),并且因此,您当前的示例类似于在 for 循环中使用 pandas,但增加了 Dask 到 pandas 的转换开销。
由于您需要处理每个文件,因此我建议您查看Dask Delayed,它在这里可能更优雅+有用。以下(伪代码)将并行化每个文件上的 pandas 操作:
import dask
import pandas as pd
for file in list_of_files:
df = dask.delayed(pd.read_csv)(file)
result.append(df.head(1) + df.tail(1))
dask.compute(*result)
Run Code Online (Sandbox Code Playgroud)
dask.visualize(*result)当我使用 4 个 csv 文件时的输出确认了并行性:
如果你真的想在这里使用 Dask DataFrame,你可以尝试:
apply获取head和tail值并将它们附加到新列表中| 归档时间: |
|
| 查看次数: |
1965 次 |
| 最近记录: |