Bru*_*ung 5 python multithreading multiprocessing bigdata pandas
我正在使用Python环境的Pandas模块制作一个基于内存的"大数据"实时计算模块.
因此响应时间是该模块的质量,非常关键且重要.
为了处理大型数据集,我将数据拆分并并行处理子拆分数据.
在存储子数据的结果的部分,花费很多时间(第21行).
我认为内部存储器深度复制出现或传递的子数据不在内存中共享.
如果我用C或C++编写模块,我将使用如下所示的指针或引用.
"process = Process(target = addNewDerivedColumn,args = [resultList,&sub_dataframe ])"
要么
"process = Process(target = addNewDerivedColumn,args = [resultList,sub_dataframe])
def addNewDerivedColumn(resultList,split_sub_dataframe&):...."
是否有一种避免内存深度复制或减少多处理时间的好方法? "不优雅"很好.我准备让我的代码变脏了.我尝试过weekref,RawValue,RawArray,Value,Pool但都失败了.
该模块正在MacOS中开发,最终将在Linux或Unix中运行.
不要考虑Windows操作系统.
这是代码.
真正的代码在我的办公室,但结构和逻辑与真实的相同.
1 #-*- coding: UTF-8 -*-'
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9
10 split_sub_dataframe['new_column']= np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11
12 print split_sub_dataframe.head()
13
14 '''
15 i think that the hole result of sub-dataframe is copied to resultList, not reference value
16 and in here time spend much
17 compare elapsed time of comment 21th line with the uncommented one
18 In MS Windows, signifiant difference of elapsed time doesn't show up
19 In Linux or Mac OS, the difference is big
20 '''
21 resultList.append(split_sub_dataframe)
22
23
24
25 if __name__ == "__main__":
26
27 # example data generation
28 # the record count of the real data is over 1 billion with about 10 columns.
29 dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30
31
32 print 'start...'
33 start_time = time.time()
34
35 # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36 split_dataframe_list = np.array_split(dataframe, 5)
37
38 # multiprocessing
39 manager = Manager()
40
41 # result list
42 resultList=manager.list()
43 processList=[]
44
45 for sub_dataframe in split_dataframe_list:
46 process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47 processList.append(process)
48
49 for proc in processList:
50 proc.start()
51 for proc in processList:
52 proc.join()
53
54
55 print 'elapsed time : ', np.round(time.time() - start_time,3)
Run Code Online (Sandbox Code Playgroud)
如果将进程间通信保持在最低限度,您将获得更好的性能.因此,不要将子DataFrame作为参数传递,而只是传递索引值.子进程可以对公共DataFrame本身进行切片.
生成子进程时,它会获取父进程的调用模块中定义的所有全局变量的副本.因此,如果在生成多处理池之前df在全局变量中定义了大型DataFrame,那么每个衍生的子流程都可以访问.df
在Windows上,如果没有fork(),则启动一个新的python进程并导入调用模块.因此,在Windows上,生成的子进程必须df从头开始重新生成,这可能需要时间和额外的内存.
但是,在Linux上,您具有写时复制功能.这意味着生成的子进程访问(调用模块的)原始全局变量而不复制它们.只有当子进程尝试修改全局时,Linux才会在修改值之前创建单独的副本.
因此,如果您避免在子进程中修改全局变量,则可以获得性能提升.我建议仅将子进程用于计算.返回计算的值,让主进程整理结果以修改原始DataFrame.
import pandas as pd
import numpy as np
import multiprocessing as mp
import time
def compute(start, end):
sub = df.iloc[start:end]
return start, end, np.abs(sub['column_01']+sub['column_01']) / 2
def collate(retval):
start, end, arr = retval
df.ix[start:end, 'new_column'] = arr
def window(seq, n=2):
"""
Returns a sliding window (of width n) over data from the sequence
s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
"""
for i in range(len(seq)-n+1):
yield tuple(seq[i:i+n])
if __name__ == "__main__":
result = []
# the record count of the real data is over 1 billion with about 10 columns.
N = 10**3
df = pd.DataFrame(np.random.randn(N, 4),
columns=['column_01', 'column_02', 'column_03', 'column_04'])
pool = mp.Pool()
df['new_column'] = np.empty(N, dtype='float')
start_time = time.time()
idx = np.linspace(0, N, 5+1).astype('int')
for start, end in window(idx, 2):
# print(start, end)
pool.apply_async(compute, args=[start, end], callback=collate)
pool.close()
pool.join()
print 'elapsed time : ', np.round(time.time() - start_time,3)
print(df.head())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2823 次 |
| 最近记录: |