RTM*_*RTM 6 python pandas dask
我有一大组 csv 文件 ( file_1.csv, file_2.csv),按时间段分隔,无法放入内存。每个文件都将采用下面提到的格式。
| instrument | time | code | val |
|------------|------|----------|---------------|
| 10 | t1 | c1_at_t1 | v_of_c1_at_t1 |
| 10 | t1 | c2_at_t1 | v_of_c2_at_t1 |
| 10 | t2 | c1_at_t2 | v_of_c1_at_t2 |
| 10 | t2 | c3_at_t2 | v_of_c3_at_t2 |
| 11 | t1 | c4_at_t1 | v_of_c4_at_t1 |
| 11 | t1 | c5_at_t1 | v_of_c5_at_t1 |
| 12 | t2 | c6_at_t2 | v_of_c6_at_t2 |
| 13 | t3 | c9_at_t3 | v_of_c9_at_t3 |
Run Code Online (Sandbox Code Playgroud)
每个文件都是关于格式一致的仪器日志。有一组工具可以code在给定的时间戳(time)发出不同的代码()。的该值code在给定的一个time适合特定的工具被保存在val列
我想file_1.csv使用instrument 列(例如:)拆分每个文件(例如:)10,然后10在所有文件(file_1.csv,file_2.csv)中加入为仪器(例如:)提取的文件
我正在考虑dask在instrument列上使用groupby 操作。是否有任何替代或更好的方法来代替使用groupby或更好的方法来提取文件instrument?
我为执行上述操作而编写的代码是
| instrument | time | code | val |
|------------|------|----------|---------------|
| 10 | t1 | c1_at_t1 | v_of_c1_at_t1 |
| 10 | t1 | c2_at_t1 | v_of_c2_at_t1 |
| 10 | t2 | c1_at_t2 | v_of_c1_at_t2 |
| 10 | t2 | c3_at_t2 | v_of_c3_at_t2 |
| 11 | t1 | c4_at_t1 | v_of_c4_at_t1 |
| 11 | t1 | c5_at_t1 | v_of_c5_at_t1 |
| 12 | t2 | c6_at_t2 | v_of_c6_at_t2 |
| 13 | t3 | c9_at_t3 | v_of_c9_at_t3 |
Run Code Online (Sandbox Code Playgroud)
一旦我有了f'{v}_{f[:-4]}.parquet'格式的文件,我就可以使用pandas从所有文件中提取的 ( file_1.csv, file_2.csv)
仪器最终的文件10应该是类似下面,其中在观察t7,t9从观察级联仪器10在其他文件中
time | code | val |
-----|----------|---------------|
t1 | c1_at_t1 | v_of_c1_at_t1 |
t1 | c2_at_t1 | v_of_c2_at_t1 |
t2 | c1_at_t2 | v_of_c1_at_t2 |
t2 | c3_at_t2 | v_of_c3_at_t2 |
t7 | c4_at_t7 | v_of_c4_at_t7 |
t9 | c5_at_t9 | v_of_c5_at_t9 |
Run Code Online (Sandbox Code Playgroud)
我不太确定您需要实现什么目标,但我认为您不需要任何分组来解决您的问题。在我看来,这似乎是一个简单的过滤问题。
您可以循环遍历所有文件并创建新的乐器文件并附加到这些文件上。
另外,我没有可供实验的示例文件,但我认为您也可以使用带有 chunksize 的 pandas 来读取大型 csv 文件。
例子:
import pandas as pd
import glob
import os
# maybe play around to get better performance
chunksize = 1000000
files = glob.glob('./file_*.csv')
for f in files:
for chunk in pd.read_csv(f, chunksize=chunksize):
u_inst = chunk['instrument'].unique()
for inst in u_inst:
# filter instrument data
inst_df = chunk[chunk.instrument == inst]
# filter columns
inst_df = inst_df[['time', 'code', 'val']]
# append to instrument file
# only write header if not exist yet
inst_file = f'./instrument_{inst}.csv'
file_exist = os.path.isfile(inst_file)
inst_df.to_csv(inst_file, mode='a', header=not file_exist)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
764 次 |
| 最近记录: |