python中的多线程系统调用

ifr*_*eak 0 python multiprocessing python-multiprocessing

我有一个类似的python脚本:

def test_run():
     global files_dir
     for f1 in os.listdir(files_dir):
          for f2 os.listdir(files_dir):
               os.system("run program x on f1 and f2")
Run Code Online (Sandbox Code Playgroud)

os.system在不同处理器上调用每个呼叫的最佳方法是什么?使用子进程或多处理池?

注意:程序的每次运行都将生成一个输出文件.

Tim*_*ers 6

@ unutbu的答案很好,但是有一个破坏性较小的方法:用a Pool来传递任务.然后你不必捣乱你自己的队列.例如,

import os
NUM_CPUS = None  # defaults to all available

def worker(f1, f2):
    os.system("run program x on f1 and f2")

def test_run(pool):
     filelist = os.listdir(files_dir)
     for f1 in filelist:
          for f2 in filelist:
               pool.apply_async(worker, args=(f1, f2))

if __name__ == "__main__":
     import multiprocessing as mp
     pool = mp.Pool(NUM_CPUS)
     test_run(pool)
     pool.close()
     pool.join()
Run Code Online (Sandbox Code Playgroud)

这看起来更像是你开始使用的代码.不是说这一定是好事;-)

在最新版本的Python 3中,Pool对象也可以用作上下文管理器,因此尾端可以简化为:

if __name__ == "__main__":
     import multiprocessing as mp
     with mp.Pool(NUM_CPUS) as pool:
         test_run(pool)
Run Code Online (Sandbox Code Playgroud)

编辑:改为使用concurrent.futures

对于像这样非常简单的任务,Python 3 concurrent.futures可以更容易使用.从上面替换上面的代码test_run(),如下所示:

def test_run():
     import concurrent.futures as cf
     filelist = os.listdir(files_dir)
     with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
         for f1 in filelist:
             for f2 in filelist:
                 pp.submit(worker, f1, f2)

if __name__ == "__main__":
     test_run()
Run Code Online (Sandbox Code Playgroud)

如果您不希望工作进程中的异常无声地消失,那么它需要更加漂亮.这是所有并行噱头的潜在问题.问题是在主程序中通常没有好的方法来引发异常,因为它们发生在上下文(工作进程)中,这可能与主程序当时正在做的事情无关.在主程序中引发异常(re)的一种方法是明确要求结果; 例如,将以上内容更改为:

def test_run():
     import concurrent.futures as cf
     filelist = os.listdir(files_dir)
     futures = []
     with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
         for f1 in filelist:
             for f2 in filelist:
                 futures.append(pp.submit(worker, f1, f2))
     for future in cf.as_completed(futures):
         future.result()
Run Code Online (Sandbox Code Playgroud)

然后,如果在工作进程中发生future.result()异常,则当它应用于Future表示失败的进程间调用的对象时,将在主程序中重新引发该异常.

可能比你想知道的更多;-)