用于并行进程的Python多处理

Car*_*los 9 python multiprocessing

如果这对某些人来说太简单了,我很抱歉,但我仍然没有得到python的多处理技巧.我已经阅读了
http://docs.python.org/dev/library/multiprocessing
http://pymotw.com/2/multiprocessing/basics.html 以及谷歌给我的许多其他教程和示例......其中很多从这里也是.

好吧,我的情况是我必须计算许多numpy矩阵,然后我需要将它们存储在一个numpy矩阵中.假设我想使用20个核心(或者我可以使用20个核心),但是我还没有成功使用池资源,因为它会使进程保持活动状态直到池"死".所以我想做这样的事情:

from multiprocessing import Process, Queue  
import numpy as np  

def f(q,i):  
     q.put( np.zeros( (4,4) ) ) 

if __name__ == '__main__':   
     q = Queue()   
     for i in range(30):   
          p = Process(target=f, args=(q,))  
          p.start()  
          p.join()  
     result = q.get()  
     while q.empty() == False:
          result += q.get()  
     print result
Run Code Online (Sandbox Code Playgroud)

但是看起来这些进程并不是并行运行的,而是它们按顺序运行(如果我错了,请纠正我)并且我不知道它们是否在计算后死亡(因此对于超过20个进程他们做了他们的部分让核心免费进行另一个过程).另外,对于非常大的数字(比如说100.000),将所有这些矩阵(可能也很大)存储在队列中会占用大量内存,因为这个想法是将每个结果放在每次迭代上在最终结果中,如使用锁(及其acquire()和release()方法),但如果此代码不用于并行处理,则锁也无用...

我希望有人可以帮助我.

提前致谢!

Dav*_*nts 14

你是对的,他们在你的例子中按顺序执行.

p.join()导致当前线程阻塞,直到它完成执行.你要么要单独加入你的程序以外的for循环(例如,通过在列表存储它们,然后再遍历它)或使用类似numpy.Poolapply_async有回调.这也可以让你直接将它添加到你的结果中而不是保持对象.

例如:

def f(i):  
    return i*np.identity(4)

if __name__ == '__main__':
    p=Pool(5)
    result = np.zeros((4,4))
    def adder(value):
        global result
        result += value

    for i in range(30):
        p.apply_async(f, args=(i,), callback=adder)
    p.close()
    p.join()
    print result
Run Code Online (Sandbox Code Playgroud)

关闭然后在最后加入池确保池的进程已完成并且result对象已完成计算.您还可以调查使用Pool.imap作为问题的解决方案.那个特定的解决方案看起来像这样:

if __name__ == '__main__':
    p=Pool(5)
    result = np.zeros((4,4))

    im = p.imap_unordered(f, range(30), chunksize=5)

    for x in im:
        result += x

    print result
Run Code Online (Sandbox Code Playgroud)

这对于您的具体情况来说更干净,但可能不适用于您最终要做的任何事情.

至于存储所有不同的结果,如果我理解你的问题,你可以将它添加到回调方法的结果中(如上所述)或一次性项目使用imap/ imap_unordered(仍然存储结果,但是你会在它构建时清除它.然后,它不需要存储的时间超过添加到结果所需的时间.