Ish*_*mar 10 python arrays multithreading multiprocessing
我有一个定义的函数,它呈现一个MxN数组.该阵列非常庞大,因此我想使用该函数同时使用多处理/线程生成小型阵列(M1xN,M2xN,M3xN --- MixN.M1 + M2 + M3 + --- + Mi = M)并最终加入这些数组形成mxn数组.正如Boardrider先生正确地建议提供一个可行的例子,下面的例子将广泛传达我打算做的事情
import numpy as n
def mult(y,x):
r = n.empty([len(y),len(x)])
for i in range(len(r)):
r[i] = y[i]*x
return r
x = n.random.rand(10000)
y = n.arange(0,100000,1)
test = mult(y=y,x=x)
Run Code Online (Sandbox Code Playgroud)
随着系统的长度x和y增加将需要越来越多的时间.对于这个例子,我想运行此代码,这样如果我有4个核心,我可以给工作,每个季度,即放弃工作来计算元素r[0],以r[24999]向第一核心,r[25000]以r[49999]到第二核心,r[50000]以r[74999]对第三个核心,r[75000]以r[99999]到第4核心.最终将结果添加到一起,将它们附加到一个阵列r[0]中r[99999].
我希望这个例子能说清楚.如果我的问题仍然不明确,请告诉我.
首先要说的是:如果它是关于同一处理器上的多个内核,那么numpy已经能够比我们手工操作更好地并行化操作(参见python中大型数组乘法的讨论)
在这种情况下,关键是简单地确保乘法都是在批量数组操作而不是Python for-loop中完成的:
test2 = x[n.newaxis, :] * y[:, n.newaxis]
n.abs( test - test2 ).max() # verify equivalence to mult(): output should be 0.0, or very small reflecting floating-point precision limitations
Run Code Online (Sandbox Code Playgroud)
[如果你真的想把它传播到多个独立的CPU上,那就不一样了,但问题似乎是建议使用单个(多核)CPU.
好的,记住以上内容:让我们假设你想并行化一个比复杂而不仅仅是复杂的操作mult().让我们假设您已经努力将您的操作优化为numpy可以自行并行化的批量数组操作,但您的操作不容易受此影响.在这种情况下,您可以使用multiprocessing.Array创建的共享内存lock=False,并multiprocessing.Pool分配进程来处理它的非重叠块,在y维度上进行划分(x如果需要,还可以同时进行).下面提供了示例列表.请注意,此方法并未明确地执行您指定的操作(将结果组合在一起并将它们附加到单个数组中).相反,它做了更有效的事情:多个进程同时在共享内存的非重叠部分中组合它们的部分答案.完成后,不需要整理/追加:我们只是读出结果.
import os, numpy, multiprocessing, itertools
SHARED_VARS = {} # the best way to get multiprocessing.Pool to send shared multiprocessing.Array objects between processes is to attach them to something global - see http://stackoverflow.com/questions/1675766/
def operate( slices ):
# grok the inputs
yslice, xslice = slices
y, x, r = get_shared_arrays('y', 'x', 'r')
# create views of the appropriate chunks/slices of the arrays:
y = y[yslice]
x = x[xslice]
r = r[yslice, xslice]
# do the actual business
for i in range(len(r)):
r[i] = y[i] * x # If this is truly all operate() does, it can be parallelized far more efficiently by numpy itself.
# But let's assume this is a placeholder for something more complicated.
return 'Process %d operated on y[%s] and x[%s] (%d x %d chunk)' % (os.getpid(), slicestr(yslice), slicestr(xslice), y.size, x.size)
def check(y, x, r):
r2 = x[numpy.newaxis, :] * y[:, numpy.newaxis] # obviously this check will only be valid if operate() literally does only multiplication (in which case this whole business is unncessary)
print( 'max. abs. diff. = %g' % numpy.abs(r - r2).max() )
return y, x, r
def slicestr(s):
return ':'.join( '' if x is None else str(x) for x in [s.start, s.stop, s.step] )
def m2n(buf, shape, typecode, ismatrix=False):
"""
Return a numpy.array VIEW of a multiprocessing.Array given a
handle to the array, the shape, the data typecode, and a boolean
flag indicating whether the result should be cast as a matrix.
"""
a = numpy.frombuffer(buf, dtype=typecode).reshape(shape)
if ismatrix: a = numpy.asmatrix(a)
return a
def n2m(a):
"""
Return a multiprocessing.Array COPY of a numpy.array, together
with shape, typecode and matrix flag.
"""
if not isinstance(a, numpy.ndarray): a = numpy.array(a)
return multiprocessing.Array(a.dtype.char, a.flat, lock=False), tuple(a.shape), a.dtype.char, isinstance(a, numpy.matrix)
def new_shared_array(shape, typecode='d', ismatrix=False):
"""
Allocate a new shared array and return all the details required
to reinterpret it as a numpy array or matrix (same order of
output arguments as n2m)
"""
typecode = numpy.dtype(typecode).char
return multiprocessing.Array(typecode, int(numpy.prod(shape)), lock=False), tuple(shape), typecode, ismatrix
def get_shared_arrays(*names):
return [m2n(*SHARED_VARS[name]) for name in names]
def init(*pargs, **kwargs):
SHARED_VARS.update(pargs, **kwargs)
if __name__ == '__main__':
ylen = 1000
xlen = 2000
init( y=n2m(range(ylen)) )
init( x=n2m(numpy.random.rand(xlen)) )
init( r=new_shared_array([ylen, xlen], float) )
print('Master process ID is %s' % os.getpid())
#print( operate([slice(None), slice(None)]) ); check(*get_shared_arrays('y', 'x', 'r')) # local test
pool = multiprocessing.Pool(initializer=init, initargs=SHARED_VARS.items())
yslices = [slice(0,333), slice(333,666), slice(666,None)]
xslices = [slice(0,1000), slice(1000,None)]
#xslices = [slice(None)] # uncomment this if you only want to divide things up in the y dimension
reports = pool.map(operate, itertools.product(yslices, xslices))
print('\n'.join(reports))
y, x, r = check(*get_shared_arrays('y', 'x', 'r'))
Run Code Online (Sandbox Code Playgroud)