use*_*212 7 python matrix multiprocessing
我正在创建一个进程池,每个进程池都需要在主程序中存在的矩阵的不同部分进行编写.不存在覆盖信息的担心,因为每个过程将与矩阵的不同行一起工作.如何使矩阵在流程中可写?
该程序是教授指定我的矩阵乘数,必须进行多处理.它将为计算机的每个核心创建一个进程.主程序将矩阵的不同部分发送给进程,然后他们将计算它们,然后它们将以一种方式返回它们,我可以识别哪个响应对应于它所基于的行.
您是否尝试过使用multiprocessing.Array类来建立一些共享内存?
另请参阅文档中的示例:
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print num.value
print arr[:]
Run Code Online (Sandbox Code Playgroud)
只需将其扩展为h*w
带有i*w+j
样式索引的大小矩阵.然后,使用Process Pool添加多个进程.
如果重新使用过程,则创建新过程或在它们之间复制矩阵的成本将使矩阵乘法的成本蒙上阴影。无论如何,numpy.dot()
可以自己利用不同的CPU内核。
矩阵乘法可以在进程间通过计算结果在不同的过程,例如不同的行中,给定的输入矩阵被分布a
并b
然后将结果(i,j)
元素是:
out[i,j] = sum(a[i,:] * b[:,j])
Run Code Online (Sandbox Code Playgroud)
因此,i
第-行可以计算为:
import numpy as np
def dot_slice(a, b, out, i):
t = np.empty_like(a[i,:])
for j in xrange(b.shape[1]):
# out[i,j] = sum(a[i,:] * b[:,j])
np.multiply(a[i,:], b[:,j], t).sum(axis=1, out=out[i,j])
Run Code Online (Sandbox Code Playgroud)
numpy
数组接受切片作为索引,例如,a[1:3,:]
返回第二行和第三行。
a
,b
为只读,这样他们可以被继承为是由子进程(利用写入时复制在Linux),其结果是使用共享阵列来计算。在计算期间仅复制索引:
import ctypes
import multiprocessing as mp
def dot(a, b, nprocesses=mp.cpu_count()):
"""Perform matrix multiplication using multiple processes."""
if (a.shape[1] != b.shape[0]):
raise ValueError("wrong shape")
# create shared array
mp_arr = mp.RawArray(ctypes.c_double, a.shape[0]*b.shape[1])
# start processes
np_args = mp_arr, (a.shape[0], b.shape[1]), a.dtype
pool = mp.Pool(nprocesses, initializer=init, initargs=(a, b)+np_args)
# perform multiplication
for i in pool.imap_unordered(mpdot_slice, slices(a.shape[0], nprocesses)):
print("done %s" % (i,))
pool.close()
pool.join()
# return result
return tonumpyarray(*np_args)
Run Code Online (Sandbox Code Playgroud)
哪里:
def mpdot_slice(i):
dot_slice(ga, gb, gout, i)
return i
def init(a, b, *np_args):
"""Called on each child process initialization."""
global ga, gb, gout
ga, gb = a, b
gout = tonumpyarray(*np_args)
def tonumpyarray(mp_arr, shape, dtype):
"""Convert shared multiprocessing array to numpy array.
no data copying
"""
return np.frombuffer(mp_arr, dtype=dtype).reshape(shape)
def slices(nitems, mslices):
"""Split nitems on mslices pieces.
>>> list(slices(10, 3))
[slice(0, 4, None), slice(4, 8, None), slice(8, 10, None)]
>>> list(slices(1, 3))
[slice(0, 1, None), slice(1, 1, None), slice(2, 1, None)]
"""
step = nitems // mslices + 1
for i in xrange(mslices):
yield slice(i*step, min(nitems, (i+1)*step))
Run Code Online (Sandbox Code Playgroud)
要测试它:
def test():
n = 100000
a = np.random.rand(50, n)
b = np.random.rand(n, 60)
assert np.allclose(np.dot(a,b), dot(a,b, nprocesses=2))
Run Code Online (Sandbox Code Playgroud)
在Linux上,此多处理版本与使用线程并在计算过程中释放GIL(在C扩展中)的解决方案具有相同的性能:
$ python -mtimeit -s'from test_cydot import a,b,out,np' 'np.dot(a,b,out)'
100 loops, best of 3: 9.05 msec per loop
$ python -mtimeit -s'from test_cydot import a,b,out,cydot' 'cydot.dot(a,b,out)'
10 loops, best of 3: 88.8 msec per loop
$ python -mtimeit -s'from test_cydot import a,b; import mpdot' 'mpdot.dot(a,b)'
done slice(49, 50, None)
..[snip]..
done slice(35, 42, None)
10 loops, best of 3: 82.3 msec per loop
Run Code Online (Sandbox Code Playgroud)
注意:测试已更改为可np.float64
在任何地方使用。
矩阵乘法意味着结果矩阵的每个元素被单独计算。这似乎是Pool的工作。由于它是家庭作业(并且还要遵循 SO 代码),我只会说明 Pool 本身的使用,而不是整个解决方案。
因此,您必须编写一个例程来计算结果矩阵的第 (i, j) 个元素:
def getProductElement(m1, m2, i, j):
# some calculations
return element
Run Code Online (Sandbox Code Playgroud)
然后初始化池:
from multiprocessing import Pool, cpu_count
pool = Pool(processes=cpu_count())
Run Code Online (Sandbox Code Playgroud)
然后您需要提交作业。您也可以将它们组织在矩阵中,但为什么要麻烦呢,让我们列一个列表吧。
result = []
# here you need to iterate through the the columns of the first and the rows of
# the second matrix. How you do it, depends on the implementation (how you store
# the matrices). Also, make sure you check the dimensions are the same.
# The simplest case is if you have a list of columns:
N = len(m1)
M = len(m2[0])
for i in range(N):
for j in range(M):
results.append(pool.apply_async(getProductElement, (m1, m2, i, j)))
Run Code Online (Sandbox Code Playgroud)
然后用结果填充结果矩阵:
m = []
count = 0
for i in range(N):
column = []
for j in range(M):
column.append(results[count].get())
m.append(column)
Run Code Online (Sandbox Code Playgroud)
同样,代码的确切形状取决于您如何表示矩阵。