Ind*_*nus 5 multiprocessing python-3.x
底部是我现在拥有的代码。它似乎工作正常。但是,我并不完全理解它。我想如果没有.join(),我会冒着代码在池完成执行之前进入下一个 for 循环的风险。我们不需要那 3 行注释掉吗?
另一方面,如果我要.close()和.join()方式一起去,有没有办法“重新打开”那个关闭的游泳池而不是Pool(6)每次?
import multiprocessing as mp
import random as rdm
from statistics import stdev, mean
import time
def mesh_subset(population, n_chosen=5):
chosen = rdm.choices(population, k=n_chosen)
return mean(chosen)
if __name__ == '__main__':
population = [x for x in range(20)]
N_iteration = 10
start_time = time.time()
pool = mp.Pool(6)
for i in range(N_iteration):
print([round(x,2) for x in population])
print(stdev(population))
# pool = mp.Pool(6)
population = pool.map(mesh_subset, [population]*len(population))
# pool.close()
# pool.join()
print('run time:', time.time() - start_time)
Run Code Online (Sandbox Code Playgroud)
工作人员池的设置成本相对较高,因此应该(如果可能)只设置一次,通常在脚本的开头。
该pool.map命令块,直到所有的任务都完成。毕竟,它返回一个结果列表。除非mesh_subset在所有输入上都被调用并为每个输入返回了结果,否则它无法做到这一点。相比之下,像pool.apply_async这样的方法不会阻塞。apply_async返回带有get方法的 ApplyResult 对象,该方法在从工作进程获得结果之前一直阻塞。
pool.close 将工作处理程序的状态设置为 CLOSE。这会导致处理程序向工作人员发出终止信号。
该pool.join直到所有的工作进程块已被终止。
所以,你不需要调用-其实你不应该叫-pool.close和pool.join,直到你与池完成。一旦工人被发送信号终止(by pool.close),就无法“重新打开”它们。您需要改为启动一个新池。
在您的情况下,由于您确实希望循环等待所有任务完成,因此使用pool.apply_async代替pool.map. 但是,如果您要使用pool.apply_async,则可以通过调用get而不是关闭并重新启动池来获得与以前相同的结果:
# you could do this, but using pool.map is simpler
for i in range(N_iteration):
apply_results = [pool.apply_async(mesh_subset, [population]) for i in range(len(population))]
# the call to result.get() blocks until its worker process (running
# mesh_subset) returns a value
population = [result.get() for result in apply_results]
Run Code Online (Sandbox Code Playgroud)
当循环完成时,len(population)不变。
如果您不希望每个循环在所有任务完成之前阻塞,您可以使用apply_async的callback功能:
N_pop = len(population)
result = []
for i in range(N_iteration):
for i in range(N_pop):
pool.apply_async(mesh_subset, [population]),
callback=result.append)
pool.close()
pool.join()
print(result)
Run Code Online (Sandbox Code Playgroud)
现在,当 anymesh_subset返回 a 时return_value,
result.append(return_value)将调用。对apply_asyncdo的调用不会阻塞,因此N_iteration * N_pop任务会pool一次性全部推送到s 任务队列中。但由于池有 6 个工作人员,mesh_subset因此在任何给定时间最多运行6 个调用
。当工作人员完成任务时,首先完成的工作人员会调用result.append(return_value)。所以中的值result是无序的。这不同于pool.map返回一个列表,该列表的返回值与其对应的参数列表的顺序相同。
除非出现异常,一旦所有任务完成,result最终将包含N_iteration * N_pop返回值。以上,并且被用来等待所有的任务来完成。pool.close()pool.join()
| 归档时间: |
|
| 查看次数: |
3453 次 |
| 最近记录: |