Par*_*and 97 python multiprocessing
我有以下设置:
do_some_processing(filename):
for line in file(filename):
if line.split(',')[0] in big_lookup_object:
# something here
if __name__ == '__main__':
big_lookup_object = marshal.load('file.bin')
pool = Pool(processes=4)
print pool.map(do_some_processing, glob.glob('*.data'))
Run Code Online (Sandbox Code Playgroud)
我正在将一些大对象加载到内存中,然后创建一个需要使用该大对象的工作池.大对象以只读方式访问,我不需要在进程之间传递它的修改.
我的问题是:加载到共享内存中的大对象,如果我在unix/c中生成进程,或者每个进程是否加载了自己的大对象副本?
更新:进一步澄清 - big_lookup_object是一个共享查找对象.我不需要拆分它并单独处理它.我需要保留一份副本.我需要分割它的工作是读取许多其他大文件,并在查找对象中查找这些大文件中的项目.
进一步更新:数据库是一个很好的解决方案,memcached可能是一个更好的解决方案,磁盘上的文件(shelve或dbm)可能更好.在这个问题中,我对内存解决方案特别感兴趣.对于最终的解决方案,我将使用hadoop,但我想看看我是否也可以拥有本地内存版本.
S.L*_*ott 48
"通过程序中先前创建的多处理共享对象生成子进程吗?"
没有.
进程具有独立的内存空间.
解决方案1
为了充分利用拥有大量工人的大型结构,请执行此操作.
将每个worker写为"过滤器" - 从stdin读取中间结果,确实有效,在stdout上写入中间结果.
将所有工人连接为管道:
process1 <source | process2 | process3 | ... | processn >result
Run Code Online (Sandbox Code Playgroud)每个进程读取,执行和写入.
由于所有进程同时运行,因此效率非常高.写入和读取直接通过进程之间的共享缓冲区.
解决方案2
在某些情况下,您有一个更复杂的结构 - 通常是"扇出"结构.在这种情况下,您的父母有多个孩子.
父打开源数据.父母分叉了许多孩子.
父读取源,将源的一部分存储到每个并发运行的子节点.
当父级到达末尾时,关闭管道.孩子得到文件结束并正常完成.
儿童部分写得愉快,因为每个孩子只是阅读sys.stdin.
父母在产生所有孩子并正确保留管道方面有一些花哨的步法,但这并不算太糟糕.
扇入是相反的结构.许多独立运行的进程需要将它们的输入交织到一个共同的进程中.收集器不容易编写,因为它必须从许多来源读取.
从许多命名管道读取通常使用该select模块来查看哪些管道具有待处理输入.
解决方案3
共享查找是数据库的定义.
解决方案3A - 加载数据库.让工人处理数据库中的数据.
解决方案3B - 使用werkzeug(或类似)创建一个非常简单的服务器,以提供响应HTTP GET的WSGI应用程序,以便工作人员可以查询服务器.
解决方案4
共享文件系统对象.Unix OS提供共享内存对象.这些只是映射到内存的文件,因此交换I/O而不是更多的常规缓冲读取.
您可以通过多种方式从Python上下文中执行此操作
编写一个启动程序,(1)将原来的巨大物体分解成较小的物体,(2)启动工人,每个人都有一个较小的物体.较小的对象可以是pickle Python对象,以节省一点文件读取时间.
编写一个启动程序,它(1)读取原始巨大的对象,并使用seek操作编写页面结构的字节编码文件,以确保通过简单的搜索很容易找到各个部分.这就是数据库引擎的作用 - 将数据分解为页面,使每个页面易于通过seek.
Spawn worker可以访问这个大型页面结构文件.每个工人都可以寻找相关部件并在那里开展工作.
jfs*_*jfs 36
这取决于.对于全局只读变量,通常可以这样考虑(除了消耗的内存),否则它不应该这样.
多处理的文档说:
Better to inherit than pickle/unpickle
在Windows上,需要对多处理中的许多类型进行选择,以便子进程可以使用它们.但是,通常应避免使用管道或队列将共享对象发送到其他进程.相反,您应该安排程序,以便需要访问其他地方创建的共享资源的进程可以从祖先进程继承它.
Explicitly pass resources to child processes
在Unix上,子进程可以使用全局资源在父进程中创建的共享资源.但是,最好将对象作为参数传递给子进程的构造函数.
除了使代码(可能)与Windows兼容之外,这还确保只要子进程仍处于活动状态,对象就不会在父进程中被垃圾回收.如果在父进程中对对象进行垃圾回收时释放某些资源,这可能很重要.
Global variables
请记住,如果在子进程中运行的代码尝试访问全局变量,那么它看到的值(如果有)可能与调用Process.start()时父进程中的值不同.
在Windows(单CPU)上:
#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool
x = 23000 # replace `23` due to small integers share representation
z = [] # integers are immutable, let's try mutable object
def printx(y):
global x
if y == 3:
x = -x
z.append(y)
print os.getpid(), x, id(x), z, id(z)
print y
if len(sys.argv) == 2 and sys.argv[1] == "sleep":
time.sleep(.1) # should make more apparant the effect
if __name__ == '__main__':
pool = Pool(processes=4)
pool.map(printx, (1,2,3,4))
Run Code Online (Sandbox Code Playgroud)
用sleep:
$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4
Run Code Online (Sandbox Code Playgroud)
没有sleep:
$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4
Run Code Online (Sandbox Code Playgroud)
Jar*_*die 26
S.Lott是对的.Python的多处理快捷方式有效地为您提供了一个独立的,重复的内存块.
在大多数*nix系统上,使用较低级别的调用os.fork()实际上会为您提供写时复制内存,这可能就是您的想法.理论上,AFAIK在最简单的程序中,您可以从该数据中读取而不会重复.
但是,Python解释器中的事情并不那么简单.对象数据和元数据存储在同一个内存段中,因此即使对象永远不会发生变化,类似于该对象的引用计数器递增也会导致内存写入,从而导致复制.几乎所有不仅仅是"打印'hello'"的Python程序都会导致引用计数增量,因此您可能永远不会意识到copy-on-write的好处.
即使有人设法破解了Python中的共享内存解决方案,尝试协调跨进程的垃圾收集也可能非常痛苦.
如果你在Unix下运行,它们可能共享同一个对象,因为fork工作原理(即子进程有单独的内存,但是它是copy-on-write,所以只要没有人修改它就可以共享它).我尝试了以下方法:
import multiprocessing
x = 23
def printx(y):
print x, id(x)
print y
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
pool.map(printx, (1,2,3,4))
Run Code Online (Sandbox Code Playgroud)
得到以下输出:
$ ./mtest.py 23 22995656 1 23 22995656 2 23 22995656 3 23 22995656 4
当然,这并不能证明副本尚未完成,但您应该能够通过查看输出来验证您的情况,ps以查看每个子进程使用的实际内存量.