12 python csv parallel-processing loops pandas
我有一个8gb的csv文件,我无法运行代码,因为它显示内存错误.
file = "./data.csv"
df = pd.read_csv(file, sep="/", header=0, dtype=str)
Run Code Online (Sandbox Code Playgroud)
我想使用python将文件拆分为8个小文件("按id排序").并且fianlly,有一个循环,以便输出文件将具有所有8个文件的输出.
或者我想尝试并行计算.主要目标是在python pandas中处理8gb数据.谢谢.
我的csv文件包含大量带有'/'作为逗号分隔符的数据,
id venue time code value ......
AAA Paris 28/05/2016 09:10 PAR 45 ......
111 Budapest 14/08/2016 19:00 BUD 62 ......
AAA Tokyo 05/11/2016 23:20 TYO 56 ......
111 LA 12/12/2016 05:55 LAX 05 ......
111 New York 08/01/2016 04:25 NYC 14 ......
AAA Sydney 04/05/2016 21:40 SYD 2 ......
ABX HongKong 28/03/2016 17:10 HKG 5 ......
ABX London 25/07/2016 13:02 LON 22 ......
AAA Dubai 01/04/2016 18:45 DXB 19 ......
.
.
.
.
Run Code Online (Sandbox Code Playgroud)
import numpy as np
from multiprocessing import Pool
def processor(df):
# Some work
df.sort_values('id', inplace=True)
return df
size = 8
df_split = np.array_split(df, size)
cores = 8
pool = Pool(cores)
for n, frame in enumerate(pool.imap(processor, df_split), start=1):
frame.to_csv('{}'.format(n))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
使用该chunksize
参数一次读取一个块并将文件保存到磁盘.这将原始文件分成相等的部分,每行100000行:
file = "./data.csv"
chunks = pd.read_csv(file, sep="/", header=0, dtype=str, chunksize = 100000)
for it, chunk in enumerate(chunks):
chunk.to_csv('chunk_{}.csv'.format(it), sep="/")
Run Code Online (Sandbox Code Playgroud)
如果您知道原始文件的行数,则可以计算精确度chunksize
以将文件拆分为8个相等的部分(nrows/8
).
pandas read_csv有两个参数选项,您可以使用它们来执行您想要执行的操作:
nrows : to specify the number of rows you want to read
skiprows : to specify the first row you want to read
Run Code Online (Sandbox Code Playgroud)
请参阅以下文档:https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
归档时间: |
|
查看次数: |
852 次 |
最近记录: |