Python多处理:如何限制等待进程的数量?

And*_*son 9 python pool multiprocessing

当使用Pool.apply_async运行大量任务(具有大参数)时,将分配进程并进入等待状态,并且对等待进程的数量没有限制.这可能最终会占用所有内存,如下例所示:

import multiprocessing
import numpy as np

def f(a,b):
    return np.linalg.solve(a,b)

def test():

    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))
    p.close()
    p.join()

if __name__ == '__main__':
    test()
Run Code Online (Sandbox Code Playgroud)

我正在寻找一种限制等待队列的方法,以便只有有限数量的等待进程,并且在等待队列已满时阻塞Pool.apply_async.

eca*_*mur 6

multiprocessing.Pool有一个_taskqueue类型的成员multiprocessing.Queue,它带有一个可选maxsize参数; 不幸的是,它没有maxsize参数集构造它.

我建议multiprocessing.Pool使用multiprocessing.Pool.__init__传递maxsize_taskqueue构造函数的复制粘贴进行子类化.

猴子修补对象(池或队列)也可以工作,但你必须monkeypatch pool._taskqueue._maxsize,pool._taskqueue._sem所以它会非常脆弱:

pool._taskqueue._maxsize = maxsize
pool._taskqueue._sem = BoundedSemaphore(maxsize)
Run Code Online (Sandbox Code Playgroud)