AKΛ*_*AKΛ 5 python csv large-data pandas data-augmentation
目前,我已经设法解决了这个问题,但它比我需要的要慢。大约需要:500k 样本需要 1 小时,整个数据集约为 100M 样本,100M 样本需要约 200 小时。
硬件/软件规格:RAM 8GB、Windows 11 64 位、Python 3.8.8
问题:
我有一个 .csv(~13GB)的数据集,其中每个样本都有一个值和几个月的相应开始结束期。我想创建一个数据集,其中每个样本都具有相同的值,但引用每个特定的值月。
例如:
从:
idx | 开始日期 | 结束日期 | 月 | 年 | 值
0 | 2022 年 5 月 20 日 | 2022 年 7 月 20 日 | 0 | 0 | X
到:
0 | 2022 年 5 月 20 日 | 2022 年 7 月 20 日 | 5 | 2022 | X
1 | 2022 年 5 月 20 日 | 2022 年 7 月 20 日 | 6 | 2022 | X
2 | 2022 年 5 月 20 日 | 2022 年 7 月 20 日 | 7 | 2022 | X
想法:设法并行执行(就像 Dask 一样,但我不确定如何完成这项任务)。
我的实现:
在 pandas 中读取块,在字典中扩充,附加到 CSV。使用一个函数,在给定 df 的情况下,计算每个样本从开始日期到结束日期的月份,并为每个月创建一个副本样本,将其附加到字典中。然后它返回最终的字典。
计算是在字典中完成的,因为它们比在 pandas 中计算要快得多。然后,我分块迭代原始 CSV,并在每个块上应用该函数,将生成的增强 df 附加到另一个 csv。
功能:
def augment_to_monthly_dict(chunk):
'''
Function takes a df or subdf data and creates and returns an Augmented dataset with monthly data in
Dictionary form (for efficiency)
'''
dict={}
l=1
for i in range(len(chunk)):#iterate through every sample
# print(str(chunk.iloc[i].APO)[4:6] )
#Find the months and years period
mst =int(float((str(chunk.iloc[i].start)[4:6])))#start month
mend=int(str(chunk.iloc[i].end)[4:6]) #end month
yst =int(str(chunk.iloc[i].start)[:4] )#start year
yend=int(str(chunk.iloc[i].end)[:4] )#end year
if yend==yst:
months=[ m for m in range(mst,mend+1)]
years=[yend for i in range(len(months))]
elif yend==yst+1:# year change at same sample
months=[m for m in range(mst,13)]
years=[yst for i in range(mst,13)]
months= months+[m for m in range(1, mend+1)]
years= years+[yend for i in range(1, mend+1)]
else:
continue
#months is a list of each month in the period of the sample and years is a same
#length list of the respective years eg months=[11,12,1,2] , years=
#[2021,2022,2022,2022]
for j in range(len(months)):#iterate through list of months
#copy the original sample make it a dictionary
tmp=pd.DataFrame(chunk.iloc[i]).transpose().to_dict(orient='records')
#change the month and year values accordingly (they were 0 for initiation)
tmp[0]['month'] = months[j]
tmp[0]['year'] = years[j]
# Here could add more calcs e.g. drop irrelevant columns, change datatypes etc
#to reduce size
#
#-------------------------------------
#Append new row to the Augmented data
dict[l] = tmp[0]
l+=1
return dict
Run Code Online (Sandbox Code Playgroud)
读取原始数据集(.csv ~13GB),使用该函数进行扩充并将结果附加到新的.csv:
chunk_count=0
for chunk in pd.read_csv('enc_star_logar_ek.csv', delimiter=';', chunksize=10000):
chunk.index = chunk.reset_index().index
aug_dict = augment_to_monthly_dict(chunk)#make chunk dictionary to work faster
chunk_count+=1
if chunk_count ==1: #get the column names and open csv write headers and 1st chunk
#Find the dicts keys, the column names only from the first dict(not reading all data)
for kk in aug_dict.values():
key_names = [i for i in kk.keys()]
print(key_names)
break #break after first input dict
#Open csv file and write ';' separated data
with open('dic_to_csv2.csv', 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile,delimiter=';', fieldnames=key_names)
writer.writeheader()
writer.writerows(aug_dict.values())
else: # Save the rest of the data chunks
print('added chunk: ', chunk_count)
with open('dic_to_csv2.csv', 'a', newline='') as csvfile:
writer = csv.DictWriter(csvfile,delimiter=';', fieldnames=key_names)
writer.writerows(aug_dict.values())
Run Code Online (Sandbox Code Playgroud)
当您需要操作列时,Pandas 的效率就会发挥作用时,Pandas 的效率就发挥了作用,为此,Pandas 逐行读取输入,为每列构建一系列数据;这是大量额外的计算,您的问题不会从中受益,事实上只会减慢您的解决方案。
\n您实际上需要操作行,最快的方法是使用标准 csv 模块;您需要做的就是读入一行,写出派生行,然后重复:
\nimport csv\nimport sys\n\nfrom datetime import datetime\n\n\ndef parse_dt(s):\n return datetime.strptime(s, r"%d/%m/%Y")\n\n\ndef get_dt_range(beg_dt, end_dt):\n """\n Returns a range of (month, year) tuples, from beg_dt up-to-and-including end_dt.\n """\n if end_dt < beg_dt:\n raise ValueError(f"end {end_dt} is before beg {beg_dt}")\n\n mo, yr = beg_dt.month, beg_dt.year\n\n dt_range = []\n while True:\n dt_range.append((mo, yr))\n if mo == 12:\n mo = 1\n yr = yr + 1\n else:\n mo += 1\n if (yr, mo) > (end_dt.year, end_dt.month):\n break\n\n return dt_range\n\n\nfname = sys.argv[1]\nwith open(fname, newline="") as f_in, open("output_csv.csv", "w", newline="") as f_out:\n reader = csv.reader(f_in)\n writer = csv.writer(f_out)\n writer.writerow(next(reader)) # transfer header\n\n for row in reader:\n beg_dt = parse_dt(row[1])\n end_dt = parse_dt(row[2])\n for mo, yr in get_dt_range(beg_dt, end_dt):\n row[3] = mo\n row[4] = yr\n writer.writerow(row)\nRun Code Online (Sandbox Code Playgroud)\n而且,为了与一般的 Pandas 进行比较,让我们检查 @abokey\ 的特定 Pandas 解决方案 \xe2\x80\x94I\ 不确定是否有更好的 Pandas 实现,但这有点做了正确的事情:
\nimport sys\nimport pandas as pd\n\nfname = sys.argv[1]\ndf = pd.read_csv(fname)\n\ndf["start date"] = pd.to_datetime(df["start date"], format="%d/%m/%Y")\ndf["end date"] = pd.to_datetime(df["end date"], format="%d/%m/%Y")\n\ndf["month"] = df.apply(\n lambda x: pd.date_range(\n start=x["start date"], end=x["end date"] + pd.DateOffset(months=1), freq="M"\n ).month.tolist(),\n axis=1,\n)\ndf["year"] = df["start date"].dt.year\n\nout = df.explode("month").reset_index(drop=True)\n\nout.to_csv("output_pd.csv")\nRun Code Online (Sandbox Code Playgroud)\n不过,让我们从基础开始,看看程序是否真的做正确的事情。鉴于此输入:
\nidx,start date,end date,month,year,value\n0,20/05/2022,20/05/2022,0,0,X\n0,20/05/2022,20/07/2022,0,0,X\n0,20/12/2022,20/01/2023,0,0,X\nRun Code Online (Sandbox Code Playgroud)\n我的程序,./main.py input.csv产生:
idx,start date,end date,month,year,value\n0,20/05/2022,20/05/2022,5,2022,X\n0,20/05/2022,20/07/2022,5,2022,X\n0,20/05/2022,20/07/2022,6,2022,X\n0,20/05/2022,20/07/2022,7,2022,X\n0,20/12/2022,20/01/2023,12,2022,X\n0,20/12/2022,20/01/2023,1,2023,X\nRun Code Online (Sandbox Code Playgroud)\n我相信这就是您正在寻找的。
\n熊猫解决方案,./main_pd.py input.csv产生:
,idx,start date,end date,month,year,value\n0,0,2022-05-20,2022-05-20,5,2022,X\n1,0,2022-05-20,2022-07-20,5,2022,X\n2,0,2022-05-20,2022-07-20,6,2022,X\n3,0,2022-05-20,2022-07-20,7,2022,X\n4,0,2022-12-20,2023-01-20,12,2022,X\n5,0,2022-12-20,2023-01-20,1,2022,X\nRun Code Online (Sandbox Code Playgroud)\n忽略添加的帧索引列,以及日期格式已更改的事实(我很确定可以使用我不知道的一些 Pandas 指令来修复),它仍然在以下方面做正确的事情使用适当的日期范围创建新行。
\n所以,两者都做正确的事。现在,谈谈性能。我复制了您的初始示例,仅复制了 1 行,即 1_000_000 和 10_000_000 行:
\nimport sys\n\nnrows = int(sys.argv[1])\nwith open(f"input_{nrows}.csv", "w") as f:\n f.write("idx,start date,end date,month,year,value\\n")\n for _ in range(nrows):\n f.write("0,20/05/2022,20/07/2022,0,0,X\\n")\nRun Code Online (Sandbox Code Playgroud)\n我正在运行配备 2TB SSD 的 2020 年 M1 MacBook Air(这提供了非常好的性能)读/写速度):
\n| 1M 行(秒,RAM) | 10M 行(秒,RAM) | |
|---|---|---|
| csv模块 | 7.8秒,6MB | 78s,6MB |
| 熊猫 | 75秒,569MB | 750s,5.8GB |
您可以看到这两个程序的运行时间随着行大小的增加而线性增加。csv 模块的内存始终不存在,因为它正在流式传输数据输入和输出(几乎不保留任何内容);Pandas 的内存随着它必须保存的行的大小而增加,以便它可以再次在整个列上进行实际的日期范围计算上进行实际的日期范围计算。另外,未显示,但对于 10M 行 Pandas 测试,Pandas 仅编写 CSV\xe2\x80\x94 就花费了近 2 分钟,比 csv 模块方法完成整个任务所需的时间还要长。
\n现在,对于我对 Pandas 的所有贬低,解决方案是少得多的行数,并且可能从一开始就没有错误。我在编写 get_dt_range() 时确实遇到了问题,不得不花大约 5 分钟思考它实际需要做什么并进行调试。
\n您可以在此处查看我使用小型测试工具的设置以及结果。
\n