Python3:多处理消耗大量RAM并减慢速度

Nat*_*tjo 6 ram multiprocessing python-3.x python-multiprocessing

我启动多个进程以创建新对象列表.htop向我展示了1到4个进程(我总是创建3个新对象).

def foo(self):
    with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool:
        result = pool.map_async(self.new_obj, self.information)
        self.new_objs = result.get()
        pool.terminate()
    gc.collect()
Run Code Online (Sandbox Code Playgroud)

foo()多次调用,每次调用时,整个过程运行得更慢,程序最终都没有完成,因为它减慢到很多.程序开始占用我的所有RAM,而顺序方法没有任何显着的RAM使用.

当我杀死程序时,大部分时间这是程序上次执行的功能.

->File "threading.py", line 293, in wait
    waiter.acquire()
Run Code Online (Sandbox Code Playgroud)

编辑 提供有关我的情况的一些信息.我创建了一个由节点组成的树.foo()由父节点调用以创建其子节点.在result由过程返回的是这些子节点.它们保存在父节点的列表中.我希望并行化这些子节点的创建,而不是以顺序方式创建它们.

val*_*val 2

我认为你的问题主要与你的并行函数是对象的方法有关。如果没有更多信息,很难确定,但请考虑这个小玩具程序:

import multiprocessing as mp
import numpy as np
import gc


class Object(object):
    def __init__(self, _):
        self.data = np.empty((100, 100, 100), dtype=np.float64)


class Container(object):
    def __new__(cls):
        self = object.__new__(cls)
        print("Born")
        return self

    def __init__(self):
        self.objects = []

    def foo(self):
        with mp.Pool(processes=3, maxtasksperchild=10) as pool:
            result = pool.map_async(self.new_obj, range(50))
            self.objects.extend(result.get())
            pool.terminate()
        gc.collect()

    def new_obj(self, i):
        return Object(i)

    def __del__(self):
        print("Dead")


if __name__ == '__main__':
    c = Container()
    for j in range(5):
        c.foo()
Run Code Online (Sandbox Code Playgroud)

NowContainer只被调用一次,所以你会期望看到 a "Born",后面跟着 a"Dead"被打印出来;但由于进程执行的代码是容器的方法,这意味着整个容器必须在其他地方执行!运行此命令,您将看到混合流"Born",并且在每次执行映射"Dead"时都会重建容器:

Born
Born
Born
Born
Born
Dead
Born
Dead
Dead
Born
Dead
Born
... 
<MANY MORE LINES HERE>
...
Born
Dead
Run Code Online (Sandbox Code Playgroud)

为了让自己相信整个容器每次都会被复制和发送,请尝试设置一些不可序列化的值:

def foo(self):
    with mp.Pool(processes=3, maxtasksperchild=10) as pool:
        result = pool.map_async(self.new_obj, range(50))
        self.fn = lambda x: x**2
        self.objects.extend(result.get())
        pool.terminate()
    gc.collect()
Run Code Online (Sandbox Code Playgroud)

这将立即引发一个,AttributeError因为它无法序列化容器。

我们总结一下:当向池发送 1000 个请求时,Container将被序列化、发送到进程并在那里反序列化1000 次。当然,它们最终会被删除(假设没有太多奇怪的交叉引用发生),但这肯定会给 RAM 带来很大的压力,因为对象被序列化、调用、更新、重新序列化......对于每个映射输入中的元素。

你怎么解决这个问题?好吧,理想情况下,不要共享状态:

def new_obj(_):
    return Object(_)


class Container(object):
    def __new__(cls):
        self = object.__new__(cls)
        print("Born")
        return self

    def __init__(self):
        self.objects = []

    def foo(self):
        with mp.Pool(processes=3, maxtasksperchild=10) as pool:
            result = pool.map_async(new_obj, range(50))
            self.objects.extend(result.get())
            pool.terminate()
        gc.collect()

    def __del__(self):
        print("Dead")
Run Code Online (Sandbox Code Playgroud)

这个过程只需一小部分时间即可完成,并且只在 RAM 上产生最小的飞艇(作为单个飞艇Container的构建)。如果您需要将一些内部状态传递到那里,请将其提取并发送:

def new_obj(tup):
    very_important_state, parameters = tup
    return Object(very_important_state=very_important_state,
                  parameters=parameters)


class Container(object):
    def __new__(cls):
        self = object.__new__(cls)
        print("Born")
        return self

    def __init__(self):
        self.objects = []

    def foo(self):
        important_state = len(self.objects)
        with mp.Pool(processes=3, maxtasksperchild=10) as pool:
            result = pool.map_async(new_obj,
                                    ((important_state, i) for i in range(50)))
            self.objects.extend(result.get())
            pool.terminate()
        gc.collect()

    def __del__(self):
        print("Dead")
Run Code Online (Sandbox Code Playgroud)

这与以前的行为相同。如果您绝对无法避免在进程之间共享某些可变状态,请检查多处理工具来执行此操作,而不必每次都复制所有内容。