Han*_*uys 9 python parallel-processing numpy pool multiprocessing
我花了几个小时来尝试并行化我的数字运算代码,但是当我这样做时它只会变慢.不幸的是,当我尝试将其减少到下面的示例时,问题就消失了,我真的不想在这里发布整个程序.所以问题是:在这类程序中我应该避免哪些陷阱?
(注意:Unutbu的答案在底部后跟进.)
以下是情况:
BigData包含大量内部数据的类.在该示例中,存在一个ff插值函数列表; 在实际的程序,还有更多,例如ffA[k],ffB[k],ffC[k].do_chunk().do_single()将在5秒内do_multi()运行并且将在55秒内运行.xi和yi数组切割成连续的块并迭代k每个块中的所有值来分解工作.这工作得更好一点.现在,无论是使用1,2,3或4个线程,总执行时间都没有差别.但当然,我希望看到实际的加速!def do_chunk(array1, array2, array3)并对该数组进行仅限于numpy的计算.在那里,有显着的速度提升.#!/usr/bin/python2.7
import numpy as np, time, sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = Pool(numproc)
stopwatch('Pool setup')
for k in range(self.n):
p = pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
sum = 0.0
for k in range(self.n):
# Edit/bugfix: replaced p.get by procs[k].get
sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
stopwatch('Jobs done')
pool.close()
pool.join()
return sum
def do_single(self, xi, yi):
sum = 0.0
for k in range(self.n):
sum += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return sum
def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
Run Code Online (Sandbox Code Playgroud)
输出:
Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds
Run Code Online (Sandbox Code Playgroud)
计时采用Intel Core i3-3227 CPU,具有2个内核,4个线程,运行64位Linux.对于实际程序,多处理版本(池机制,即使只使用一个核心)比单进程版本慢10倍.
跟进
Unutbu的回答让我走上正轨.在实际程序中,self被腌制成需要传递给工作进程的37到140 MB的对象.更糟糕的是,Python酸洗非常缓慢; 酸洗本身花了几秒钟,这发生在传递给工人流程的每一块工作中.除了腌制和传递大数据对象之外,apply_asyncLinux中的开销非常小; 对于一个小函数(添加几个整数参数),每对apply_async/ 每get对仅需0.2 ms .因此,以非常小的块分割工作本身并不是问题.所以,我将所有大数组参数作为索引传递给全局变量.为了CPU缓存优化,我保持小块大小.
全局变量存储在全局变量中dict; 在设置工作池之后,将立即在父进程中删除这些条目.只有密钥才dict传送给工作人员.酸洗/ IPC唯一的大数据是工人创建的新数据.
#!/usr/bin/python2.7
import numpy as np, sys
from multiprocessing import Pool
_mproc_data = {} # global storage for objects during multiprocessing.
class BigData:
def __init__(self, size):
self.blah = np.random.uniform(0, 1, size=size)
def do_chunk(self, k, xi, yi):
# do the work and return an array of the same shape as xi, yi
zi = k*np.ones_like(xi)
return zi
def do_all_work(self, xi, yi, num_proc):
global _mproc_data
mp_key = str(id(self))
_mproc_data['bd'+mp_key] = self # BigData
_mproc_data['xi'+mp_key] = xi
_mproc_data['yi'+mp_key] = yi
pool = Pool(processes=num_proc)
# processes have now inherited the global variabele; clean up in the parent process
for v in ['bd', 'xi', 'yi']:
del _mproc_data[v+mp_key]
# setup indices for the worker processes (placeholder)
n_chunks = 45
n = len(xi)
chunk_len = n//n_chunks
i1list = np.arange(0,n,chunk_len)
i2list = i1list + chunk_len
i2list[-1] = n
klist = range(n_chunks) # placeholder
procs = []
for i in range(n_chunks):
p = pool.apply_async( _do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]) )
sys.stderr.write(".")
procs.append(p)
sys.stderr.write("\n")
# allocate space for combined results
zi = np.zeros_like(xi)
# get data from workers and finish
for i, p in enumerate(procs):
zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling
pool.close()
pool.join()
return zi
def _do_chunk_wrapper(key, i1, i2, k):
"""All arguments are small objects."""
global _mproc_data
bd = _mproc_data['bd'+key]
xi = _mproc_data['xi'+key][i1:i2]
yi = _mproc_data['yi'+key][i1:i2]
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
bd = BigData(int(1e7))
bd.do_all_work(xi, yi, 4)
Run Code Online (Sandbox Code Playgroud)
这里有一个速度测试的结果(再次,2个核,4个线程),变化的工作进程的数量和存储器中的组块(的总字节量xi,yi,zi阵列切片).这些数字是"每秒百万结果值",但这对比较并不重要."1进程"的行是do_chunk对完整输入数据的直接调用,没有任何子进程.
#Proc 125K 250K 500K 1000K unlimited
1 0.82
2 4.28 1.96 1.3 1.31
3 2.69 1.06 1.06 1.07
4 2.17 1.27 1.23 1.28
Run Code Online (Sandbox Code Playgroud)
数据大小对内存的影响非常大.CPU具有3 MB共享L3缓存,每个核心具有256 KB L2缓存.请注意,计算还需要访问BigData对象的几MB内部数据.因此,我们从中学到的是进行这种速度测试很有用.对于这个程序,2个进程最快,其次是4个,3个是最慢的.
unu*_*tbu 13
尝试减少进程间通信.在multiprocessing模块中通过队列完成所有(单机)进程间通信.通过队列传递的对象被腌制.因此,尝试通过队列发送更少和/或更小的对象.
不要通过队列发送self实例BigData.它相当大,随着数据量的self增长而变大:
In [6]: import pickle
In [14]: len(pickle.dumps(BigData(50)))
Out[14]: 1052187
Run Code Online (Sandbox Code Playgroud)
每次pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))调用时,
self都会在主进程中进行pickle并在工作进程中进行unpickled.大小len(pickle.dumps(BigData(N)))增加了N.
让数据从全局变量中读取.在Linux上,您可以利用Copy-on-Write.正如Jan-Philip Gehrcke所说:
在fork()之后,父级和子级处于等效状态.将父级的整个内存复制到RAM中的另一个位置是愚蠢的.那就是[其中]写入时复制原则[来自].只要孩子不改变其内存状态,它实际上访问父母的内存.只有在修改时,相应的位和片才被复制到子节点的存储空间中.
因此,您可以BigData通过简单地将实例定义为全局bd = BigData(n),(正如您已经在做)并在工作进程中引用其值(例如_do_chunk_wrapper)来避免传递通过Queue的实例.它基本上等于self从电话中删除pool.apply_async:
p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
Run Code Online (Sandbox Code Playgroud)
并bd作为全球访问,并对do_chunk_wrapper呼叫签名进行必要的随之更改.
尝试将更长时间运行的函数传递func给pool.apply_async.如果你有许多快速完成的调用,pool.apply_async那么传递参数和通过队列返回值的开销将成为整个时间的重要部分.相反,如果您在返回结果之前进行较少的调用pool.apply_async并提供func更多工作,那么进程间通信将占总时间的一小部分.
下面,我修改_do_chunk_wrapper为接受k_start和k_end参数,以便每次调用pool.apply_async将k在返回结果之前计算许多值的总和.
import math
import numpy as np
import time
import sys
import multiprocessing as mp
import scipy.interpolate as interpolate
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = interpolate.RectBivariateSpline(
np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
n = self.n
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_chunk_of_chunks(self, k_start, k_end, xi, yi):
s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi)))
for k in range(k_start, k_end))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = mp.Pool(numproc)
stopwatch('\nPool setup')
ks = list(map(int, np.linspace(0, self.n, numproc+1)))
for i in range(len(ks)-1):
k_start, k_end = ks[i:i+2]
p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
total = 0.0
for k, p in enumerate(procs):
total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
print(total)
stopwatch('Jobs done')
pool.close()
pool.join()
return total
def do_single(self, xi, yi):
total = 0.0
for k in range(self.n):
total += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return total
def _do_chunk_wrapper(k_start, k_end, xi, yi):
return bd.do_chunk_of_chunks(k_start, k_end, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
Run Code Online (Sandbox Code Playgroud)
产量
Initialized: 0.15 seconds
Pool setup: 0.06 seconds
Jobs queued (2 processes): 0.00 seconds
First get() done: 6.56 seconds
83963796.0404
Jobs done: 0.55 seconds
..
Pool setup: 0.08 seconds
Jobs queued (3 processes): 0.00 seconds
First get() done: 5.19 seconds
83963796.0404
Jobs done: 1.57 seconds
...
All in single process: 12.13 seconds
Run Code Online (Sandbox Code Playgroud)
与原始代码相比:
Initialized: 0.10 seconds
Pool setup: 0.03 seconds
Jobs queued (2 processes): 0.00 seconds
First get() done: 10.47 seconds
Jobs done: 0.00 seconds
..................................................
Pool setup: 0.12 seconds
Jobs queued (3 processes): 0.00 seconds
First get() done: 9.21 seconds
Jobs done: 0.00 seconds
..................................................
All in single process: 12.12 seconds
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4735 次 |
| 最近记录: |