如何在列上使用 dask groupby 分隔文件

RTM*_*RTM 6 python pandas dask

我有一大组 csv 文件 ( file_1.csv, file_2.csv),按时间段分隔,无法放入内存。每个文件都将采用下面提到的格式。


| instrument | time | code     | val           |
|------------|------|----------|---------------|
| 10         | t1   | c1_at_t1 | v_of_c1_at_t1 |
| 10         | t1   | c2_at_t1 | v_of_c2_at_t1 |
| 10         | t2   | c1_at_t2 | v_of_c1_at_t2 |
| 10         | t2   | c3_at_t2 | v_of_c3_at_t2 |
| 11         | t1   | c4_at_t1 | v_of_c4_at_t1 |
| 11         | t1   | c5_at_t1 | v_of_c5_at_t1 |
| 12         | t2   | c6_at_t2 | v_of_c6_at_t2 |
| 13         | t3   | c9_at_t3 | v_of_c9_at_t3 |
Run Code Online (Sandbox Code Playgroud)

每个文件都是关于格式一致的仪器日志。有一组工具可以code在给定的时间戳(time)发出不同的代码()。的该值code在给定的一个time适合特定的工具被保存在val

我想file_1.csv使用instrument 列(例如:)拆分每个文件(例如:)10,然后10在所有文件(file_1.csvfile_2.csv)中加入为仪器(例如:)提取的文件

我正在考虑daskinstrument列上使用groupby 操作。是否有任何替代或更好的方法来代替使用groupby或更好的方法来提取文件instrument

我为执行上述操作而编写的代码是


| instrument | time | code     | val           |
|------------|------|----------|---------------|
| 10         | t1   | c1_at_t1 | v_of_c1_at_t1 |
| 10         | t1   | c2_at_t1 | v_of_c2_at_t1 |
| 10         | t2   | c1_at_t2 | v_of_c1_at_t2 |
| 10         | t2   | c3_at_t2 | v_of_c3_at_t2 |
| 11         | t1   | c4_at_t1 | v_of_c4_at_t1 |
| 11         | t1   | c5_at_t1 | v_of_c5_at_t1 |
| 12         | t2   | c6_at_t2 | v_of_c6_at_t2 |
| 13         | t3   | c9_at_t3 | v_of_c9_at_t3 |
Run Code Online (Sandbox Code Playgroud)

一旦我有了f'{v}_{f[:-4]}.parquet'格式的文件,我就可以使用pandas从所有文件中提取的 ( file_1.csv, file_2.csv)

仪器最终的文件10应该是类似下面,其中在观察t7t9从观察级联仪器10在其他文件中

time | code     | val           |
-----|----------|---------------|
t1   | c1_at_t1 | v_of_c1_at_t1 |
t1   | c2_at_t1 | v_of_c2_at_t1 |
t2   | c1_at_t2 | v_of_c1_at_t2 |
t2   | c3_at_t2 | v_of_c3_at_t2 |
t7   | c4_at_t7 | v_of_c4_at_t7 |
t9   | c5_at_t9 | v_of_c5_at_t9 |
Run Code Online (Sandbox Code Playgroud)

mjs*_*ier 4

我不太确定您需要实现什么目标,但我认为您不需要任何分组来解决您的问题。在我看来,这似乎是一个简单的过滤问题。

您可以循环遍历所有文件并创建新的乐器文件并附加到这些文件上。

另外,我没有可供实验的示例文件,但我认为您也可以使用带有 chunksize 的 pandas 来读取大型 csv 文件。

例子:

import pandas as pd
import glob
import os

# maybe play around to get better performance 
chunksize = 1000000

files = glob.glob('./file_*.csv')
for f in files:

     for chunk in pd.read_csv(f, chunksize=chunksize):
         u_inst = chunk['instrument'].unique()

         for inst in u_inst:
             # filter instrument data
            inst_df = chunk[chunk.instrument == inst]
            # filter columns
            inst_df = inst_df[['time', 'code', 'val']]
            # append to instrument file
            # only write header if not exist yet
            inst_file = f'./instrument_{inst}.csv'
            file_exist = os.path.isfile(inst_file)
            inst_df.to_csv(inst_file, mode='a', header=not file_exist)
Run Code Online (Sandbox Code Playgroud)