Bil*_*ill 2 python indexing concatenation dataframe dask
我是 Dask 的新手,并认为这将是一项简单的任务。我想从多个 csv 文件加载数据并将其合并到一个 Dask 数据帧中。在本例中,有 5 个 csv 文件,每个文件有 10,000 行数据。显然我想给组合的数据框一个唯一的索引。
所以我这样做了:
import dask.dataframe as dd
# Define Dask computations
dataframes = [
dd.read_csv(os.path.join(data_dir, filename)).set_index('Unnamed: 0')
for filename in os.listdir(data_dir) if filename.endswith('.csv')
]
combined_df = dd.concat(dataframes).reset_index(drop=True)
Run Code Online (Sandbox Code Playgroud)
如果我这样做,combined_df.head().index我会按预期得到这个:
RangeIndex(start=0, stop=5, step=1)
Run Code Online (Sandbox Code Playgroud)
但combined_df.tail().index并不如预期:
RangeIndex(start=3252, stop=3257, step=1)
Run Code Online (Sandbox Code Playgroud)
进一步检查发现索引值combined_df由 15 个独立的系列组成,长度大约为 3256,加起来总长度为 50000。请注意,csv 文件在第一列中都包含一个从 0 到 10000 的索引。
这里发生了什么,我如何获得一个从 0 到 50000 的标准整数索引,它是所有 csv 文件中总行数的总和?
背景
如果您需要测试上面的代码,这里有一个设置脚本来创建一些 csv 文件:
import os
import numpy as np
import pandas as pd
# Create 5 large csv files (could be too big to fit all in memory)
shape = (10000, 1000)
data_dir = 'data'
if not os.path.exists(data_dir):
os.mkdir(data_dir)
for i in range(5):
filepath = os.path.join(data_dir, f'datafile_{i:02d}.csv')
if not os.path.exists(filepath):
data = (i + 1) * np.random.randn(shape[0], shape[1])
print(f"Array {i} size in memory: {data.nbytes*1e-6:.2f} MB")
pd.DataFrame(data).to_csv(filepath)
Run Code Online (Sandbox Code Playgroud)
更新:
这种方法似乎出现了同样的问题:
combined_df = dd.read_csv(os.path.join(data_dir, '*.csv'))
print(dd.compute(combined_df.tail().index)[0])
print(dd.compute(combined_df.reset_index(drop=True).tail().index)[0])
RangeIndex(start=3252, stop=3257, step=1)
RangeIndex(start=3252, stop=3257, step=1)
Run Code Online (Sandbox Code Playgroud)
在我看来reset_index方法产生相同的索引。
在dask版本中,reset_index 在每个分区上单独(并同时)执行其任务,因此索引中的连续数字“重新启动”作为一些点,实际上是在每个分区的开头。
要规避此限制,您可以:
一个副作用是索引的名称现在是这个新列的名称。如果要清除它,则必须在分区级别进行,调用 map_partitions。
所以整个代码可以是:
ddf = ddf.assign(idx=1)
ddf = ddf.set_index(ddf.idx.cumsum() - 1)
ddf = ddf.map_partitions(lambda df: df.rename(index = {'idx': None}))
Run Code Online (Sandbox Code Playgroud)
请注意,assign(idx=1)是可以的,因为这个明显的单个值被 广播到整个 DataFrame 的长度,所以 这个新列中的每个元素都将设置为1,而我不知道 DataFrame 包含多少行。这是底层Numpy包的强大功能之一,它大大简化了Numpy、Pandas和dask中的编程。
然后你可以运行:ddf.compute()查看结果。
| 归档时间: |
|
| 查看次数: |
1154 次 |
| 最近记录: |