Nat*_*tjo 6 ram multiprocessing python-3.x python-multiprocessing
我启动多个进程以创建新对象列表.htop向我展示了1到4个进程(我总是创建3个新对象).
def foo(self):
with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, self.information)
self.new_objs = result.get()
pool.terminate()
gc.collect()
Run Code Online (Sandbox Code Playgroud)
我foo()多次调用,每次调用时,整个过程运行得更慢,程序最终都没有完成,因为它减慢到很多.程序开始占用我的所有RAM,而顺序方法没有任何显着的RAM使用.
当我杀死程序时,大部分时间这是程序上次执行的功能.
->File "threading.py", line 293, in wait
waiter.acquire()
Run Code Online (Sandbox Code Playgroud)
编辑
提供有关我的情况的一些信息.我创建了一个由节点组成的树.foo()由父节点调用以创建其子节点.在result由过程返回的是这些子节点.它们保存在父节点的列表中.我希望并行化这些子节点的创建,而不是以顺序方式创建它们.
我认为你的问题主要与你的并行函数是对象的方法有关。如果没有更多信息,很难确定,但请考虑这个小玩具程序:
import multiprocessing as mp
import numpy as np
import gc
class Object(object):
def __init__(self, _):
self.data = np.empty((100, 100, 100), dtype=np.float64)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def new_obj(self, i):
return Object(i)
def __del__(self):
print("Dead")
if __name__ == '__main__':
c = Container()
for j in range(5):
c.foo()
Run Code Online (Sandbox Code Playgroud)
NowContainer只被调用一次,所以你会期望看到 a "Born",后面跟着 a"Dead"被打印出来;但由于进程执行的代码是容器的方法,这意味着整个容器必须在其他地方执行!运行此命令,您将看到混合流"Born",并且在每次执行映射"Dead"时都会重建容器:
Born
Born
Born
Born
Born
Dead
Born
Dead
Dead
Born
Dead
Born
...
<MANY MORE LINES HERE>
...
Born
Dead
Run Code Online (Sandbox Code Playgroud)
为了让自己相信整个容器每次都会被复制和发送,请尝试设置一些不可序列化的值:
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.fn = lambda x: x**2
self.objects.extend(result.get())
pool.terminate()
gc.collect()
Run Code Online (Sandbox Code Playgroud)
这将立即引发一个,AttributeError因为它无法序列化容器。
我们总结一下:当向池发送 1000 个请求时,Container将被序列化、发送到进程并在那里反序列化1000 次。当然,它们最终会被删除(假设没有太多奇怪的交叉引用发生),但这肯定会给 RAM 带来很大的压力,因为对象被序列化、调用、更新、重新序列化......对于每个映射输入中的元素。
你怎么解决这个问题?好吧,理想情况下,不要共享状态:
def new_obj(_):
return Object(_)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
Run Code Online (Sandbox Code Playgroud)
这个过程只需一小部分时间即可完成,并且只在 RAM 上产生最小的飞艇(作为单个飞艇Container的构建)。如果您需要将一些内部状态传递到那里,请将其提取并发送:
def new_obj(tup):
very_important_state, parameters = tup
return Object(very_important_state=very_important_state,
parameters=parameters)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
important_state = len(self.objects)
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj,
((important_state, i) for i in range(50)))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
Run Code Online (Sandbox Code Playgroud)
这与以前的行为相同。如果您绝对无法避免在进程之间共享某些可变状态,请检查多处理工具来执行此操作,而不必每次都复制所有内容。
| 归档时间: |
|
| 查看次数: |
564 次 |
| 最近记录: |