ifr*_*eak 0 python multiprocessing python-multiprocessing
我有一个类似的python脚本:
def test_run():
global files_dir
for f1 in os.listdir(files_dir):
for f2 os.listdir(files_dir):
os.system("run program x on f1 and f2")
Run Code Online (Sandbox Code Playgroud)
os.system在不同处理器上调用每个呼叫的最佳方法是什么?使用子进程或多处理池?
注意:程序的每次运行都将生成一个输出文件.
@ unutbu的答案很好,但是有一个破坏性较小的方法:用a Pool来传递任务.然后你不必捣乱你自己的队列.例如,
import os
NUM_CPUS = None # defaults to all available
def worker(f1, f2):
os.system("run program x on f1 and f2")
def test_run(pool):
filelist = os.listdir(files_dir)
for f1 in filelist:
for f2 in filelist:
pool.apply_async(worker, args=(f1, f2))
if __name__ == "__main__":
import multiprocessing as mp
pool = mp.Pool(NUM_CPUS)
test_run(pool)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
这看起来更像是你开始使用的代码.不是说这一定是好事;-)
在最新版本的Python 3中,Pool对象也可以用作上下文管理器,因此尾端可以简化为:
if __name__ == "__main__":
import multiprocessing as mp
with mp.Pool(NUM_CPUS) as pool:
test_run(pool)
Run Code Online (Sandbox Code Playgroud)
编辑:改为使用concurrent.futures
对于像这样非常简单的任务,Python 3 concurrent.futures可以更容易使用.从上面替换上面的代码test_run(),如下所示:
def test_run():
import concurrent.futures as cf
filelist = os.listdir(files_dir)
with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
for f1 in filelist:
for f2 in filelist:
pp.submit(worker, f1, f2)
if __name__ == "__main__":
test_run()
Run Code Online (Sandbox Code Playgroud)
如果您不希望工作进程中的异常无声地消失,那么它需要更加漂亮.这是所有并行噱头的潜在问题.问题是在主程序中通常没有好的方法来引发异常,因为它们发生在上下文(工作进程)中,这可能与主程序当时正在做的事情无关.在主程序中引发异常(re)的一种方法是明确要求结果; 例如,将以上内容更改为:
def test_run():
import concurrent.futures as cf
filelist = os.listdir(files_dir)
futures = []
with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
for f1 in filelist:
for f2 in filelist:
futures.append(pp.submit(worker, f1, f2))
for future in cf.as_completed(futures):
future.result()
Run Code Online (Sandbox Code Playgroud)
然后,如果在工作进程中发生future.result()异常,则当它应用于Future表示失败的进程间调用的对象时,将在主程序中重新引发该异常.
可能比你想知道的更多;-)