如何在Dask中重置连接数据帧的索引

Bil*_*ill 2 python indexing concatenation dataframe dask

我是 Dask 的新手,并认为这将是一项简单的任务。我想从多个 csv 文件加载数据并将其合并到一个 Dask 数据帧中。在本例中,有 5 个 csv 文件,每个文件有 10,000 行数据。显然我想给组合的数据框一个唯一的索引。

所以我这样做了:

import dask.dataframe as dd

# Define Dask computations
dataframes = [
    dd.read_csv(os.path.join(data_dir, filename)).set_index('Unnamed: 0')
    for filename in os.listdir(data_dir) if filename.endswith('.csv')
]

combined_df = dd.concat(dataframes).reset_index(drop=True)
Run Code Online (Sandbox Code Playgroud)

如果我这样做,combined_df.head().index我会按预期得到这个:

RangeIndex(start=0, stop=5, step=1)
Run Code Online (Sandbox Code Playgroud)

combined_df.tail().index并不如预期:

RangeIndex(start=3252, stop=3257, step=1)
Run Code Online (Sandbox Code Playgroud)

进一步检查发现索引值combined_df由 15 个独立的系列组成,长度大约为 3256,加起来总长度为 50000。请注意,csv 文件在第一列中都包含一个从 0 到 10000 的索引。

这里发生了什么,我如何获得一个从 0 到 50000 的标准整数索引,它是所有 csv 文件中总行数的总和?

背景

如果您需要测试上面的代码,这里有一个设置脚本来创建一些 csv 文件:

import os
import numpy as np
import pandas as pd

# Create 5 large csv files (could be too big to fit all in memory)
shape = (10000, 1000)

data_dir = 'data'
if not os.path.exists(data_dir):
    os.mkdir(data_dir)

for i in range(5):
    filepath = os.path.join(data_dir, f'datafile_{i:02d}.csv')
    if not os.path.exists(filepath):
        data = (i + 1) * np.random.randn(shape[0], shape[1])
        print(f"Array {i} size in memory: {data.nbytes*1e-6:.2f} MB")
        pd.DataFrame(data).to_csv(filepath)
Run Code Online (Sandbox Code Playgroud)

更新:

这种方法似乎出现了同样的问题:

combined_df = dd.read_csv(os.path.join(data_dir, '*.csv'))
print(dd.compute(combined_df.tail().index)[0])
print(dd.compute(combined_df.reset_index(drop=True).tail().index)[0])

RangeIndex(start=3252, stop=3257, step=1)
RangeIndex(start=3252, stop=3257, step=1)
Run Code Online (Sandbox Code Playgroud)

在我看来reset_index方法产生相同的索引。

Val*_*_Bo 5

dask版本中,reset_index 在每个分区上单独(并同时)执行其任务,因此索引中的连续数字“重新启动”作为一些点,实际上是在每个分区的开头。

要规避此限制,您可以:

  • 分配一个填充为1的新列。
  • 将索引设置为cumsum() - 1在此列上计算(幸运的是,与reset_index相反,cumsum是在整个 DataFrame上计算的)。

一个副作用是索引的名称现在是这个新列的名称。如果要清除它,则必须在分区级别进行,调用 map_partitions

所以整个代码可以是:

ddf = ddf.assign(idx=1)
ddf = ddf.set_index(ddf.idx.cumsum() - 1)
ddf = ddf.map_partitions(lambda df: df.rename(index = {'idx': None}))
Run Code Online (Sandbox Code Playgroud)

请注意,assign(idx=1)是可以的,因为这个明显的单个值被 广播到整个 DataFrame 的长度,所以 这个新列中的每个元素都将设置为1,而我不知道 DataFrame 包含多少行。这是底层Numpy包的强大功能之一,它大大简化了NumpyPandasdask中的编程

然后你可以运行:ddf.compute()查看结果。