利用"Copy-on-Write"将数据复制到Multiprocessing.Pool()工作进程

ale*_*son 11 python memory-management multiprocessing python-multiprocessing

我有multiprocessing一些看起来有点像这样的Python代码:

import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):
    def __init__(self):
        self.myAttribute = np.zeros(100000000) # basically a big memory struct

    def my_multithreaded_analysis(self):
        arg_lists = [(self, i) for i in range(10)]
        pool = Pool(processes=10)
        result = pool.map(call_method, arg_lists)
        print result

    def analyze(self, i):
        time.sleep(10)
        return i ** 2

def call_method(args):
    my_instance, i = args
    return my_instance.analyze(i)


if __name__ == '__main__':
    my_instance = MyClass()
    my_instance.my_multithreaded_analysis()
Run Code Online (Sandbox Code Playgroud)

在阅读了有关内存如何在其他StackOverflow答案(例如Python多处理内存使用情况)中工作的答案后,我认为这不会使用内存与我用于多处理的进程数量成比例,因为它是写时复制和我没有修改任何属性my_instance.但是,当我运行顶部时,我确实看到了所有进程的高内存,它说我的大多数进程都使用了大量内存(这是OSX的最高输出,但我可以在Linux上复制).

我的问题基本上是,我是否正确地解释了这一点,因为我的实例MyClass实际上是在池中重复的?如果是这样,我该如何防止这种情况; 我应该不使用这样的结构吗?我的目标是减少计算分析的内存使用量.

PID   COMMAND      %CPU  TIME     #TH    #WQ  #PORT MEM    PURG   CMPRS  PGRP PPID STATE
2494  Python       0.0   00:01.75 1      0    7     765M   0B     0B     2484 2484 sleeping
2493  Python       0.0   00:01.85 1      0    7     765M   0B     0B     2484 2484 sleeping
2492  Python       0.0   00:01.86 1      0    7     765M   0B     0B     2484 2484 sleeping
2491  Python       0.0   00:01.83 1      0    7     765M   0B     0B     2484 2484 sleeping
2490  Python       0.0   00:01.87 1      0    7     765M   0B     0B     2484 2484 sleeping
2489  Python       0.0   00:01.79 1      0    7     167M   0B     597M   2484 2484 sleeping
2488  Python       0.0   00:01.77 1      0    7     10M    0B     755M   2484 2484 sleeping
2487  Python       0.0   00:01.75 1      0    7     8724K  0B     756M   2484 2484 sleeping
2486  Python       0.0   00:01.78 1      0    7     9968K  0B     755M   2484 2484 sleeping
2485  Python       0.0   00:01.74 1      0    7     171M   0B     594M   2484 2484 sleeping
2484  Python       0.1   00:16.43 4      0    18    775M   0B     12K    2484 2235 sleeping
Run Code Online (Sandbox Code Playgroud)

Sha*_*ger 20

发送到pool.map(和相关方法)的任何内容实际上并不使用共享的写时复制资源.这些值是"pickled"(Python的序列化机制),通过管道发送到工作进程并在那里进行unpickled,从头开始重建子对象.因此,在这种情况下,每个孩子最终都会得到原始数据的写时复制版本(它从未使用过,因为它被告知要使用通过IPC发送的副本),以及原始数据的个人重新创建.在孩子身上重建,不分享.

如果要利用分叉的写时复制优势,则无法通过管道发送数据(或引用数据的对象).您必须将它们存储在可以通过访问自己的全局变量从子项中找到的位置.例如:

import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):
    def __init__(self):
        self.myAttribute = np.zeros(100000000) # basically a big memory struct

    def my_multithreaded_analysis(self):
        arg_lists = list(range(10))  # Don't pass self
        pool = Pool(processes=10)
        result = pool.map(call_method, arg_lists)
        print result

    def analyze(self, i):
        time.sleep(10)
        return i ** 2

def call_method(i):
    # Implicitly use global copy of my_instance, not one passed as an argument
    return my_instance.analyze(i)

# Constructed globally and unconditionally, so the instance exists
# prior to forking in commonly accessible location
my_instance = MyClass()


if __name__ == '__main__':
    my_instance.my_multithreaded_analysis()
Run Code Online (Sandbox Code Playgroud)

通过不传递self,您可以避免复制,只需使用映射到映射的单个全局对象映射到子级.如果您需要多个对象,则可以在创建池之前创建对象实例的全局listdict映射,然后传递可以查找对象的索引或键作为参数的一部分pool.map.然后,worker函数使用索引/键(必须通过IPC腌制并发送给子节点)来查找全局字典中的值(写入时复制映射)(也是写入时复制映射),所以你复制便宜的信息来查找孩子的昂贵数据而不复制它.

  • 另请注意:如果对象很小,即使您没有写入,它们也会被复制.CPython是引用计数,引用计数出现在公共对象头中并且不断更新,只需引用该对象,即使它是一个逻辑上非变异的引用.因此,将写入小对象(以及在同一内存页中分配的所有其他对象),从而进行复制.对于大型对象(您的一亿个元素`numpy`数组),只要您没有写入它,大多数都将保持共享,因为标题只占用许多页面中的一个. (4认同)
  • @dre-hh:[从 3.8 开始,macOS 默认使用 `'spawn'` 方法而不是 `'fork'`,因为 macOS 系统框架不是 `fork` 安全的](https://bugs.python.org /issue33725)。“spawn”的工作方式与“fork”的工作方式“非常”不同(它做了一堆东西来模拟分叉,但 COW 根本不参与)。您始终可以尝试选择“fork”启动方法(如果您在“fork”时机上运气不佳,则可能会导致代码崩溃)。 (2认同)

The*_*inn 7

或者,为了利用分叉的写时复制优势,同时保留一些封装的外观,您可以利用 class-attributes 和 @classmethods 而不是 pureglobals

import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):

    myAttribute = np.zeros(100000000) # basically a big memory struct
    # myAttribute is a class-attribute

    @classmethod
    def my_multithreaded_analysis(cls):
        arg_list = [i for i in range(10)]
        pool = Pool(processes=10)
        result = pool.map(analyze, arg_list)
        print result

    @classmethod
    def analyze(cls, i):
        time.sleep(10)
        # If you wanted, you could access cls.myAttribute w/o worry here.
        return i ** 2

""" We don't need this proxy step !
    def call_method(args):
        my_instance, i = args
        return my_instance.analyze(i)
"""

if __name__ == '__main__':
    my_instance = MyClass()
    # Note that now you can instantiate MyClass anywhere in your app,
    # While still taking advantage of copy-on-write forking
    my_instance.my_multithreaded_analysis()
Run Code Online (Sandbox Code Playgroud)

注 1:是的,我承认class-attributes并且class-methods是美化的全局变量。但它购买了一些封装......

注意 2:arg_lists您可以通过将绑定实例方法传递给,将实例(self)隐式传递给每个创建的任务,而不是显式地创建上面的内容,这样更容易!Poolanalyze(self)Pool.map()