如何重用多处理池?

Ind*_*nus 5 multiprocessing python-3.x

底部是我现在拥有的代码。它似乎工作正常。但是,我并不完全理解它。我想如果没有.join(),我会冒着代码在池完成执行之前进入下一个 for 循环的风险。我们不需要那 3 行注释掉吗?

另一方面,如果我要.close().join()方式一起去,有没有办法“重新打开”那个关闭的游泳池而不是Pool(6)每次?

import multiprocessing as mp
import random as rdm
from statistics import stdev, mean
import time


def mesh_subset(population, n_chosen=5):
    chosen = rdm.choices(population, k=n_chosen)
    return mean(chosen)


if __name__ == '__main__':
    population = [x for x in range(20)]
    N_iteration = 10
    start_time = time.time()
    pool = mp.Pool(6)
    for i in range(N_iteration):
        print([round(x,2) for x in population])
        print(stdev(population))
        # pool = mp.Pool(6)
        population = pool.map(mesh_subset, [population]*len(population))
        # pool.close()
        # pool.join()
    print('run time:', time.time() - start_time)
Run Code Online (Sandbox Code Playgroud)

unu*_*tbu 5

工作人员池的设置成本相对较高,因此应该(如果可能)只设置一次,通常在脚本的开头。

pool.map命令块,直到所有的任务都完成。毕竟,它返回一个结果列表。除非mesh_subset在所有输入上都被调用并为每个输入返回了结果,否则它无法做到这一点。相比之下,像pool.apply_async这样的方法不会阻塞。apply_async返回带有get方法的 ApplyResult 对象,该方法在从工作进程获得结果之前一直阻塞。

pool.close 将工作处理程序的状态设置为 CLOSE。这会导致处理程序向工作人员发出终止信号

pool.join直到所有的工作进程块已被终止。

所以,你不需要调用-其实你不应该叫-pool.closepool.join,直到你与池完成。一旦工人被发送信号终止(by pool.close),就无法“重新打开”它们。您需要改为启动一个新池。


在您的情况下,由于您确实希望循环等待所有任务完成,因此使用pool.apply_async代替pool.map. 但是,如果您要使用pool.apply_async,则可以通过调用get而不是关闭并重新启动池来获得与以前相同的结果:

# you could do this, but using pool.map is simpler
for i in range(N_iteration):
    apply_results = [pool.apply_async(mesh_subset, [population]) for i in range(len(population))]
    # the call to result.get() blocks until its worker process (running
    # mesh_subset) returns a value
    population = [result.get() for result in apply_results]
Run Code Online (Sandbox Code Playgroud)

当循环完成时,len(population)不变。


如果您不希望每个循环在所有任务完成之前阻塞,您可以使用apply_asynccallback功能:

N_pop = len(population)
result = []
for i in range(N_iteration):
    for i in range(N_pop):
        pool.apply_async(mesh_subset, [population]),
                         callback=result.append)
pool.close()
pool.join()
print(result)
Run Code Online (Sandbox Code Playgroud)

现在,当 anymesh_subset返回 a 时return_valueresult.append(return_value)将调用。对apply_asyncdo的调用不会阻塞,因此N_iteration * N_pop任务会pool一次性全部推送到s 任务队列中。但由于池有 6 个工作人员,mesh_subset因此在任何给定时间最多运行6 个调用 。当工作人员完成任务时,首先完成的工作人员会调用result.append(return_value)。所以中的值result是无序的。这不同于pool.map返回一个列表,该列表的返回值与其对应的参数列表的顺序相同。

除非出现异常,一旦所有任务完成result最终将包含N_iteration * N_pop返回值。以上,并且被用来等待所有的任务来完成。pool.close()pool.join()