如何限制多处理过程的范围?

Hoo*_*ked 15 python multiprocessing

使用python的multiprocessing模块,以下设计的示例以最小的内存要求运行:

import multiprocessing 
# completely_unrelated_array = range(2**25)

def foo(x):
    for x in xrange(2**28):pass
    print x**2

P = multiprocessing.Pool()

for x in range(8):
    multiprocessing.Process(target=foo, args=(x,)).start()
Run Code Online (Sandbox Code Playgroud)

取消注释的创建,completely_unrelated_array你会发现每个衍生的进程分配内存为completely_unrelated_array!这是一个更大的项目的最小例子,我无法弄清楚如何解决方法; 多处理似乎可以复制全局的所有内容.我并不需要共享内存对象,我只需要传递x和处理它没有整个程序的内存开销.

侧面观察:有趣的是print id(completely_unrelated_array)内部foo给出相同的值,暗示某些可能不是副本......

dan*_*ano 11

由于其性质os.fork(),__main__模块的全局命名空间中的任何变量都将由子进程继承(假设您在Posix平台上),因此您将看到子进程中的内存使用情况反映出'创造了.我不确定是否所有内存都是真正分配的,据我所知,内存是共享的,直到你真正尝试在子代中更改内存,此时会创建一个新副本.另一方面,Windows不使用os.fork()- 它重新导入每个子节点中的主模块,并挑选要发送给子节点的任何局部变量.因此,使用Windows,您实际上可以通过仅在if __name__ == "__main__":保护中定义它来避免在子项中复制的大型全局结尾,因为该保护内部的所有内容都只会在父进程中运行:

import time
import multiprocessing 


def foo(x):
    for x in range(2**28):pass
    print(x**2)

if __name__ == "__main__":
    completely_unrelated_array = list(range(2**25)) # This will only be defined in the parent on Windows
    P = multiprocessing.Pool()

    for x in range(8):
        multiprocessing.Process(target=foo, args=(x,)).start()
Run Code Online (Sandbox Code Playgroud)

现在,在Python 2.x中,multiprocessing.Process如果您使用的是Posix平台,则只能通过分叉来创建新对象.但是在Python 3.4上,您可以使用上下文指定如何创建新进程.因此,我们可以指定"spawn"上下文(Windows使用的上下文)来创建新进程,并使用相同的技巧:

# Note that this is Python 3.4+ only
import time
import multiprocessing 

def foo(x):
    for x in range(2**28):pass
    print(x**2)


if __name__ == "__main__":
    completely_unrelated_array = list(range(2**23))  # Again, this only exists in the parent
    ctx = multiprocessing.get_context("spawn") # Use process spawning instead of fork
    P = ctx.Pool()

    for x in range(8):
        ctx.Process(target=foo, args=(x,)).start()
Run Code Online (Sandbox Code Playgroud)

如果您需要2.x支持,或者希望坚持使用os.fork()创建新Process对象,我认为您可以做的最好的方法是将报告的内存使用率降低,立即删除子进程中的违规对象:

import time
import multiprocessing 
import gc

def foo(x):
    init()
    for x in range(2**28):pass
    print(x**2)

def init():
    global completely_unrelated_array
    completely_unrelated_array = None
    del completely_unrelated_array
    gc.collect()

if __name__ == "__main__":
    completely_unrelated_array = list(range(2**23))
    P = multiprocessing.Pool(initializer=init)

    for x in range(8):
        multiprocessing.Process(target=foo, args=(x,)).start()
    time.sleep(100)
Run Code Online (Sandbox Code Playgroud)


jta*_*lor 5

这里重要的是您要定位的平台。Unix系统进程是通过使用写时复制(cow)内存创建的。因此,即使每个进程都获得了父进程的完整内存的副本,但在修改该内存时,实际上仅以每页为基础(4KiB)分配该内存。因此,如果您仅针对这些平台,则无需进行任何更改。

如果您要针对没有牛叉的平台,则可能要使用python 3.4及其新的派生上下文spawnforkserver,请参见文档。 这些方法将创建新的进程,这些进程与父进程不共享任何状态或共享有限状态,并且所有内存传递都是显式的。

但这并不是说生成的过程将导入您的模块,因此所有全局数据都将被显式复制,并且无法进行写时复制。为避免这种情况,您必须缩小数据范围。

import multiprocessing  as mp
import numpy as np

def foo(x):
    import time
    time.sleep(60)

if __name__ == "__main__":
    mp.set_start_method('spawn')
    # not global so forks will not have this allocated due to the spawn method
    # if the method would be fork the children would still have this memory allocated
    # but it could be copy-on-write
    completely_unrelated_array = np.ones((5000, 10000))
    P = mp.Pool()
    for x in range(3):
        mp.Process(target=foo, args=(x,)).start()
Run Code Online (Sandbox Code Playgroud)

例如带spawn的top输出:

%MEM     TIME+ COMMAND
29.2   0:00.52 python3                                                
0.5   0:00.00 python3    
0.5   0:00.00 python3    
0.5   0:00.00 python3    
Run Code Online (Sandbox Code Playgroud)

和叉:

%MEM     TIME+ COMMAND
29.2   0:00.52 python3                                                
29.1   0:00.00 python3    
29.1   0:00.00 python3                                                
29.1   0:00.00 python3
Run Code Online (Sandbox Code Playgroud)

请注意,由于写入时复制,其超过100%