Car*_*los 9 python multiprocessing
如果这对某些人来说太简单了,我很抱歉,但我仍然没有得到python的多处理技巧.我已经阅读了
http://docs.python.org/dev/library/multiprocessing
http://pymotw.com/2/multiprocessing/basics.html
以及谷歌给我的许多其他教程和示例......其中很多从这里也是.
好吧,我的情况是我必须计算许多numpy矩阵,然后我需要将它们存储在一个numpy矩阵中.假设我想使用20个核心(或者我可以使用20个核心),但是我还没有成功使用池资源,因为它会使进程保持活动状态直到池"死".所以我想做这样的事情:
from multiprocessing import Process, Queue
import numpy as np
def f(q,i):
q.put( np.zeros( (4,4) ) )
if __name__ == '__main__':
q = Queue()
for i in range(30):
p = Process(target=f, args=(q,))
p.start()
p.join()
result = q.get()
while q.empty() == False:
result += q.get()
print result
Run Code Online (Sandbox Code Playgroud)
但是看起来这些进程并不是并行运行的,而是它们按顺序运行(如果我错了,请纠正我)并且我不知道它们是否在计算后死亡(因此对于超过20个进程他们做了他们的部分让核心免费进行另一个过程).另外,对于非常大的数字(比如说100.000),将所有这些矩阵(可能也很大)存储在队列中会占用大量内存,因为这个想法是将每个结果放在每次迭代上在最终结果中,如使用锁(及其acquire()和release()方法),但如果此代码不用于并行处理,则锁也无用...
我希望有人可以帮助我.
提前致谢!
Dav*_*nts 14
你是对的,他们在你的例子中按顺序执行.
p.join()导致当前线程阻塞,直到它完成执行.你要么要单独加入你的程序以外的for循环(例如,通过在列表存储它们,然后再遍历它)或使用类似numpy.Pool并apply_async有回调.这也可以让你直接将它添加到你的结果中而不是保持对象.
例如:
def f(i):
return i*np.identity(4)
if __name__ == '__main__':
p=Pool(5)
result = np.zeros((4,4))
def adder(value):
global result
result += value
for i in range(30):
p.apply_async(f, args=(i,), callback=adder)
p.close()
p.join()
print result
Run Code Online (Sandbox Code Playgroud)
关闭然后在最后加入池确保池的进程已完成并且result对象已完成计算.您还可以调查使用Pool.imap作为问题的解决方案.那个特定的解决方案看起来像这样:
if __name__ == '__main__':
p=Pool(5)
result = np.zeros((4,4))
im = p.imap_unordered(f, range(30), chunksize=5)
for x in im:
result += x
print result
Run Code Online (Sandbox Code Playgroud)
这对于您的具体情况来说更干净,但可能不适用于您最终要做的任何事情.
至于存储所有不同的结果,如果我理解你的问题,你可以将它添加到回调方法的结果中(如上所述)或一次性项目使用imap/ imap_unordered(仍然存储结果,但是你会在它构建时清除它.然后,它不需要存储的时间超过添加到结果所需的时间.
| 归档时间: |
|
| 查看次数: |
12603 次 |
| 最近记录: |