假设我有一个大内存numpy数组,我有一个函数func,它接受这个巨大的数组作为输入(连同一些其他参数).func具有不同参数可以并行运行.例如:
def func(arr, param):
# do stuff to arr, param
# build array arr
pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
Run Code Online (Sandbox Code Playgroud)
如果我使用多处理库,那么这个巨型数组将被多次复制到不同的进程中.
有没有办法让不同的进程共享同一个数组?此数组对象是只读的,永远不会被修改.
更复杂的是,如果arr不是一个数组,而是一个任意的python对象,有没有办法分享它?
[EDITED]
我读了答案,但我仍然有点困惑.由于fork()是copy-on-write,因此在python多处理库中生成新进程时不应调用任何额外的成本.但是下面的代码表明存在巨大的开销:
from multiprocessing import Pool, Manager
import numpy as np;
import time
def f(arr):
return len(arr)
t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;
pool = Pool(processes = 6)
t = …Run Code Online (Sandbox Code Playgroud) python parallel-processing numpy shared-memory multiprocessing
我想在共享内存中使用numpy数组与多处理模块一起使用.困难是使用它像一个numpy数组,而不仅仅是一个ctypes数组.
from multiprocessing import Process, Array
import scipy
def f(a):
a[0] = -a[0]
if __name__ == '__main__':
# Create the array
N = int(10)
unshared_arr = scipy.rand(N)
arr = Array('d', unshared_arr)
print "Originally, the first two elements of arr = %s"%(arr[:2])
# Create, start, and finish the child processes
p = Process(target=f, args=(arr,))
p.start()
p.join()
# Printing out the changed values
print "Now, the first two elements of arr = %s"%arr[:2]
Run Code Online (Sandbox Code Playgroud)
这会产生如下输出:
Originally, the first two elements of arr = …Run Code Online (Sandbox Code Playgroud) 我有一个使用Twisted的xmlrpc服务器.服务器有大量的数据存储在内存中.是否可以运行一个辅助的单独的xmlrpc服务器,它可以访问第一个服务器中的内存中的对象?
因此,serverA启动并创建一个对象.serverB启动并可以从serverA中的对象读取.
*编辑*
要共享的数据是一百万个元组的列表.
有没有一种很好的方法可以在不使用磁盘的情况下在两个python子进程之间传递大量数据?这是我希望完成的动画示例:
import sys, subprocess, numpy
cmdString = """
import sys, numpy
done = False
while not done:
cmd = raw_input()
if cmd == 'done':
done = True
elif cmd == 'data':
##Fake data. In real life, get data from hardware.
data = numpy.zeros(1000000, dtype=numpy.uint8)
data.dump('data.pkl')
sys.stdout.write('data.pkl' + '\\n')
sys.stdout.flush()"""
proc = subprocess.Popen( #python vs. pythonw on Windows?
[sys.executable, '-c %s'%cmdString],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for i in range(3):
proc.stdin.write('data\n')
print proc.stdout.readline().rstrip()
a = numpy.load('data.pkl')
print a.shape
proc.stdin.write('done\n')
Run Code Online (Sandbox Code Playgroud)
这将创建一个子进程,该子进程生成numpy数组并将数组保存到磁盘.然后父进程从磁盘加载数组.有用!
问题是,我们的硬件可以生成比磁盘可读/写快10倍的数据.有没有办法将数据从一个python进程传输到另一个纯内存中,甚至可能没有复制数据?我可以做一些像传递参考的东西吗?
我第一次尝试纯粹在内存中传输数据是非常糟糕的:
import …Run Code Online (Sandbox Code Playgroud) 我正在使用Python的multiprocessing模块并行处理大型numpy数组.numpy.load(mmap_mode='r')在主进程中使用内存映射数组.在那之后,multiprocessing.Pool()分叉过程(我推测).
一切似乎都很好,除了我得到的行:
AttributeError("'NoneType' object has no attribute 'tell'",)
in `<bound method memmap.__del__ of
memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)>`
ignored
Run Code Online (Sandbox Code Playgroud)
在unittest日志中.尽管如此,测试仍然没有通过.
知道那里发生了什么吗?
使用Python 2.7.2,OS X,NumPy 1.6.1.
更新:
经过一些调试后,我把原因归结为一个代码路径,该代码路径使用这个内存映射的numpy数组(一小部分)作为Pool.imap调用的输入.
显然,"问题"是multiprocessing.Pool.imap通过将输入传递给新进程的方式:它使用pickle.这不适用于mmaped numpy数组,而内部的某些内容会导致错误.
我发现Robert Kern的回复似乎解决了同样的问题.他建议为imap输入来自内存映射数组时创建一个特殊的代码路径:在生成的进程中手动映射同一个数组.
这将是如此复杂和丑陋,我宁愿忍受错误和额外的内存副本.有没有其他方法可以更轻松地修改现有代码?
简而言之
我想同时更改复杂的python对象,每个对象只由一个进程处理.我怎么能这样做(效率最高)?实施某种酸洗支持会有帮助吗?这会有效吗?
完整的问题
我有一个python数据结构ArrayDict,基本上由一个numpy数组和一个字典组成,并将任意索引映射到数组中的行.在我的例子中,所有键都是整数.
a = ArrayDict()
a[1234] = 12.5
a[10] = 3
print(a[1234]) #12.5
print(a[10]) # 3.0
print(a[1234] == a.array[a.indexDict[1234]]) #true
Run Code Online (Sandbox Code Playgroud)
现在我有多个这样的ArrayDicts并希望填写它们myMethod(arrayDict, params).由于myMethod价格昂贵,我想并行运行它.请注意,myMethod可能会添加许多行arrayDict.每个过程都会改变自己的过程ArrayDict.我不需要并发访问ArrayDicts.
在myMethod,我更改了条目arrayDict(即,我更改了内部numpy数组),我添加了条目arrayDict(也就是说,我向字典添加另一个索引并在内部数组中写入一个新值).最终,我希望能够在arrayDict内部numpy阵列变得太小时进行交换.这不会经常发生,如果没有更好的解决方案,我可以在程序的非并行部分执行此操作.即使没有阵列交换,我自己的尝试也没有成功.
我花了几天时间研究共享内存和python的多处理模块.由于我最终将在linux上工作,因此任务似乎相当简单:系统调用fork()允许有效地处理参数的副本.我的想法是ArrayDict在自己的进程中更改每个,返回对象的更改版本,并覆盖原始对象.为了节省内存并保存复制工作,我还使用了sharedmem数组来存储数据ArrayDict.我知道字典必须仍然被复制.
from sharedmem import sharedmem
import numpy as np
n = ... # length of …Run Code Online (Sandbox Code Playgroud) 我正在尝试为此循环实现多处理.它无法修改数组,或者似乎没有正确排序作业(在最后一个函数完成之前返回数组).
import multiprocessing
import numpy
def func(i, array):
array[i] = i**2
print(i**2)
def main(n):
array = numpy.zeros(n)
if __name__ == '__main__':
jobs = []
for i in range(0, n):
p = multiprocessing.Process(target=func, args=(i, array))
jobs.append(p)
p.start()
return array
print(main(10))
Run Code Online (Sandbox Code Playgroud)