Ven*_*tta 112 python parallel-processing numpy shared-memory multiprocessing
假设我有一个大内存numpy数组,我有一个函数func
,它接受这个巨大的数组作为输入(连同一些其他参数).func
具有不同参数可以并行运行.例如:
def func(arr, param):
# do stuff to arr, param
# build array arr
pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
Run Code Online (Sandbox Code Playgroud)
如果我使用多处理库,那么这个巨型数组将被多次复制到不同的进程中.
有没有办法让不同的进程共享同一个数组?此数组对象是只读的,永远不会被修改.
更复杂的是,如果arr不是一个数组,而是一个任意的python对象,有没有办法分享它?
[EDITED]
我读了答案,但我仍然有点困惑.由于fork()是copy-on-write,因此在python多处理库中生成新进程时不应调用任何额外的成本.但是下面的代码表明存在巨大的开销:
from multiprocessing import Pool, Manager
import numpy as np;
import time
def f(arr):
return len(arr)
t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;
pool = Pool(processes = 6)
t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;
Run Code Online (Sandbox Code Playgroud)
输出(顺便说一下,随着数组大小的增加,成本也会增加,所以我怀疑仍有与内存复制相关的开销):
construct array = 0.0178790092468
multiprocessing overhead = 0.252444982529
Run Code Online (Sandbox Code Playgroud)
如果我们不复制数组,为什么会有这么大的开销?共享内存拯救了我的哪一部分?
Fra*_*ila 110
如果您使用的是使用写时复制fork()
语义的操作系统(如任何常见的unix),那么只要您永远不会改变您的数据结构,它就可供所有子进程使用而不占用额外的内存.你不必做任何特别的事情(除了绝对确保你不改变对象).
您可以为您的问题做的最有效的事情是将您的数组打包到一个有效的数组结构(使用numpy
或array
),将其放在共享内存中,包装它multiprocessing.Array
,并将其传递给您的函数.这个答案显示了如何做到这一点.
如果你想要一个可写的共享对象,那么你需要用某种同步或锁定来包装它.multiprocessing
提供了两种执行此操作的方法:一种使用共享内存(适用于简单值,数组或ctypes)或Manager
代理,其中一个进程保存内存,管理器仲裁从其他进程(甚至通过网络)对它的访问.
该Manager
方法可以与任意Python对象一起使用,但是比使用共享内存的等效方法慢,因为对象需要被序列化/反序列化并在进程之间发送.
Python中提供了大量并行处理库和方法.multiprocessing
是一个优秀的,全面的图书馆,但如果您有特殊需求,也许其他方法可能会更好.
小智 17
我遇到了同样的问题,写了一个小的共享内存实用程序类来解决它.
我正在使用multiprocessing.RawArray(lockfree),并且对数组的访问根本不同步(lockfree),小心不要自己动手.
通过该解决方案,我在四核i7上获得了大约3倍的加速.
这是代码:随意使用和改进它,请报告任何错误.
'''
Created on 14.05.2013
@author: martin
'''
import multiprocessing
import ctypes
import numpy as np
class SharedNumpyMemManagerError(Exception):
pass
'''
Singleton Pattern
'''
class SharedNumpyMemManager:
_initSize = 1024
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(SharedNumpyMemManager, cls).__new__(
cls, *args, **kwargs)
return cls._instance
def __init__(self):
self.lock = multiprocessing.Lock()
self.cur = 0
self.cnt = 0
self.shared_arrays = [None] * SharedNumpyMemManager._initSize
def __createArray(self, dimensions, ctype=ctypes.c_double):
self.lock.acquire()
# double size if necessary
if (self.cnt >= len(self.shared_arrays)):
self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)
# next handle
self.__getNextFreeHdl()
# create array in shared memory segment
shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))
# convert to numpy array vie ctypeslib
self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)
# do a reshape for correct dimensions
# Returns a masked array containing the same data, but with a new shape.
# The result is a view on the original array
self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)
# update cnt
self.cnt += 1
self.lock.release()
# return handle to the shared memory numpy array
return self.cur
def __getNextFreeHdl(self):
orgCur = self.cur
while self.shared_arrays[self.cur] is not None:
self.cur = (self.cur + 1) % len(self.shared_arrays)
if orgCur == self.cur:
raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')
def __freeArray(self, hdl):
self.lock.acquire()
# set reference to None
if self.shared_arrays[hdl] is not None: # consider multiple calls to free
self.shared_arrays[hdl] = None
self.cnt -= 1
self.lock.release()
def __getArray(self, i):
return self.shared_arrays[i]
@staticmethod
def getInstance():
if not SharedNumpyMemManager._instance:
SharedNumpyMemManager._instance = SharedNumpyMemManager()
return SharedNumpyMemManager._instance
@staticmethod
def createArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)
@staticmethod
def getArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)
@staticmethod
def freeArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)
# Init Singleton on module load
SharedNumpyMemManager.getInstance()
if __name__ == '__main__':
import timeit
N_PROC = 8
INNER_LOOP = 10000
N = 1000
def propagate(t):
i, shm_hdl, evidence = t
a = SharedNumpyMemManager.getArray(shm_hdl)
for j in range(INNER_LOOP):
a[i] = i
class Parallel_Dummy_PF:
def __init__(self, N):
self.N = N
self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)
self.pool = multiprocessing.Pool(processes=N_PROC)
def update_par(self, evidence):
self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))
def update_seq(self, evidence):
for i in range(self.N):
propagate((i, self.arrayHdl, evidence))
def getArray(self):
return SharedNumpyMemManager.getArray(self.arrayHdl)
def parallelExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_par(5)
print(pf.getArray())
def sequentialExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_seq(5)
print(pf.getArray())
t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")
print("Sequential: ", t1.timeit(number=1))
print("Parallel: ", t2.timeit(number=1))
Run Code Online (Sandbox Code Playgroud)
这是Ray的预期用例,这是一个用于并行和分布式Python的库。在后台,它使用Apache Arrow数据布局(零副本格式)序列化对象,并将其存储在共享内存对象存储中,这样多个进程可以访问它们而无需创建副本。
该代码如下所示。
import numpy as np
import ray
ray.init()
@ray.remote
def func(array, param):
# Do stuff.
return 1
array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)
result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)
Run Code Online (Sandbox Code Playgroud)
如果您不调用ray.put
该数组,则该数组仍将存储在共享内存中,但是每次调用都会完成一次func
,这不是您想要的。
请注意,这不仅适用于数组,而且适用于包含数组的对象,例如,将int映射到数组的字典如下所示。
您可以通过在IPython中运行以下代码来比较Ray和pickle中的序列化性能。
import numpy as np
import pickle
import ray
ray.init()
x = {i: np.ones(10**7) for i in range(20)}
# Time Ray.
%time x_id = ray.put(x) # 2.4s
%time new_x = ray.get(x_id) # 0.00073s
# Time pickle.
%time serialized = pickle.dumps(x) # 2.6s
%time deserialized = pickle.loads(serialized) # 1.9s
Run Code Online (Sandbox Code Playgroud)
使用Ray进行序列化仅比pickle快一点,但是由于使用了共享内存,反序列化的速度要快1000倍(此数字当然取决于对象)。
请参阅Ray文档。您可以阅读更多有关使用Ray和Arrow进行快速序列化的信息。注意我是Ray开发人员之一。
就像 Robert Nishihara 提到的那样,Apache Arrow 使这一切变得简单,特别是使用 Plasma 内存对象存储,这是 Ray 的构建基础。
出于这个原因,我专门制作了脑等离子体- 在 Flask 应用程序中快速加载和重新加载大对象。它是 Apache Arrow 可序列化对象的共享内存对象命名空间,包括pickle
由pickle.dumps(...)
.
Apache Ray 和 Plasma 的主要区别在于它为您跟踪对象 ID。在本地运行的任何进程或线程或程序都可以通过从任何Brain
对象调用名称来共享变量的值。
$ pip install brain-plasma
Run Code Online (Sandbox Code Playgroud)
$ plasma_store -m 10000000 -s /tmp/plasma
from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/')
brain['a'] = [1]*10000
brain['a']
# >>> [1,1,1,1,...]
Run Code Online (Sandbox Code Playgroud)