我想在共享内存中使用numpy数组与多处理模块一起使用.困难是使用它像一个numpy数组,而不仅仅是一个ctypes数组.
from multiprocessing import Process, Array
import scipy
def f(a):
a[0] = -a[0]
if __name__ == '__main__':
# Create the array
N = int(10)
unshared_arr = scipy.rand(N)
arr = Array('d', unshared_arr)
print "Originally, the first two elements of arr = %s"%(arr[:2])
# Create, start, and finish the child processes
p = Process(target=f, args=(arr,))
p.start()
p.join()
# Printing out the changed values
print "Now, the first two elements of arr = %s"%arr[:2]
Run Code Online (Sandbox Code Playgroud)
这会产生如下输出:
Originally, the first two elements of arr = …Run Code Online (Sandbox Code Playgroud) 我有一个多处理工作,我正在排队只读numpy数组,作为生产者消费者管道的一部分.
目前他们正在被腌制,因为这是默认行为multiprocessing.Queue会降低性能.
是否有任何pythonic方法将引用传递给共享内存而不是pickle数组?
不幸的是,在消费者启动之后会生成数组,并且没有简单的方法.(所以全局变量方法会很难看......).
[注意,在下面的代码中,我们不期望并行计算h(x0)和h(x1).相反,我们看到并行计算的h(x0)和g(h(x1))(就像CPU中的流水线一样).
from multiprocessing import Process, Queue
import numpy as np
class __EndToken(object):
pass
def parrallel_pipeline(buffer_size=50):
def parrallel_pipeline_with_args(f):
def consumer(xs, q):
for x in xs:
q.put(x)
q.put(__EndToken())
def parallel_generator(f_xs):
q = Queue(buffer_size)
consumer_process = Process(target=consumer,args=(f_xs,q,))
consumer_process.start()
while True:
x = q.get()
if isinstance(x, __EndToken):
break
yield x
def f_wrapper(xs):
return parallel_generator(f(xs))
return f_wrapper
return parrallel_pipeline_with_args
@parrallel_pipeline(3)
def f(xs):
for x in xs:
yield x + 1.0
@parrallel_pipeline(3)
def g(xs):
for x in xs:
yield x …Run Code Online (Sandbox Code Playgroud)