并行运行多个系统命令

mic*_*hal 30 python

我编写了一个简单的脚本,对一系列文件执行系统命令.为了加快速度,我想并行运行它们,但不是一次全部运行 - 我需要控制同时运行的命令的最大数量.什么是最简单的方法来解决这个问题?

Sve*_*ach 26

如果你正在调用子进程,我认为不需要使用线程池.使用该subprocess模块的基本实现将是

import subprocess
import os
import time

files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5

for name in files:
    processes.add(subprocess.Popen([command, name]))
    if len(processes) >= max_processes:
        os.wait()
        processes.difference_update([
            p for p in processes if p.poll() is not None])
Run Code Online (Sandbox Code Playgroud)

在Windows上,os.wait()不可用(也没有任何其他等待任何子进程终止的方法).您可以通过在特定时间间隔内轮询来解决此问题:

for name in files:
    processes.add(subprocess.Popen([command, name]))
    while len(processes) >= max_processes:
        time.sleep(.1)
        processes.difference_update([
            p for p in processes if p.poll() is not None])
Run Code Online (Sandbox Code Playgroud)

睡眠时间取决于子过程的预期执行时间.


Thu*_*ner 16

Sven Marnach的回答几乎是对的,但是有一个问题.如果最后一个max_processes进程结束,主程序将尝试启动另一个进程,for循环将结束.这将关闭主进程,进而可以关闭子进程.对我来说,这种行为发生在screen命令中.

Linux中的代码将是这样的(并且只适用于python2.7):

import subprocess
import os
import time

files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5

for name in files:
    processes.add(subprocess.Popen([command, name]))
    if len(processes) >= max_processes:
        os.wait()
        processes.difference_update(
            [p for p in processes if p.poll() is not None])
#Check if all the child processes were closed
for p in processes:
    if p.poll() is None:
        p.wait()
Run Code Online (Sandbox Code Playgroud)

  • 我认为您应该删除它,然后通过编辑将其添加到Sven的答案中。这是不好的形式吗? (2认同)
  • "荣耀归于那些回答" (2认同)

And*_*son 8

您需要将Semaphore对象与线程结合在一起。信号量是一个对象,可让您限制在给定代码段中运行的线程数。在这种情况下,我们将使用信号量来限制可以运行os.system调用的线程数。

首先,我们导入所需的模块:

#!/usr/bin/python

import threading
import os
Run Code Online (Sandbox Code Playgroud)

接下来,我们创建一个信号量对象。这里的数字四是一次可以获取信号量的线程数。这限制了可以一次运行的子流程的数量。

semaphore = threading.Semaphore(4)
Run Code Online (Sandbox Code Playgroud)

该函数只是将对子过程的调用包装在对信号量的调用中。

def run_command(cmd):
    semaphore.acquire()
    try:
        os.system(cmd)
    finally:
        semaphore.release()
Run Code Online (Sandbox Code Playgroud)

如果您使用的是Python 2.6+,则可以变得更加简单,因为您可以使用“ with”语句来执行Acquisition和Release调用。

def run_command(cmd):
    with semaphore:
        os.system(cmd)
Run Code Online (Sandbox Code Playgroud)

最后,为了表明它能按预期工作,我们将调用“ sleep 10”命令八次。

for i in range(8):
    threading.Thread(target=run_command, args=("sleep 10", )).start()
Run Code Online (Sandbox Code Playgroud)

使用“时间”程序运行脚本显示,由于并行运行了四个睡眠中的两个,因此只需要20秒。

aw@aw-laptop:~/personal/stackoverflow$ time python 4992400.py 

real    0m20.032s                                                                                                                                                                   
user    0m0.020s                                                                                                                                                                    
sys     0m0.008s 
Run Code Online (Sandbox Code Playgroud)


Max*_*Max 5

我将Sven和Thuener的解决方案合并为一个解决方案,该解决方案等待跟踪进程,并在其中一个进程崩溃时也停止运行:

def removeFinishedProcesses(processes):
    """ given a list of (commandString, process), 
        remove those that have completed and return the result 
    """
    newProcs = []
    for pollCmd, pollProc in processes:
        retCode = pollProc.poll()
        if retCode==None:
            # still running
            newProcs.append((pollCmd, pollProc))
        elif retCode!=0:
            # failed
            raise Exception("Command %s failed" % pollCmd)
        else:
            logging.info("Command %s completed successfully" % pollCmd)
    return newProcs

def runCommands(commands, maxCpu):
            processes = []
            for command in commands:
                logging.info("Starting process %s" % command)
                proc =  subprocess.Popen(shlex.split(command))
                procTuple = (command, proc)
                processes.append(procTuple)
                while len(processes) >= maxCpu:
                    time.sleep(.2)
                    processes = removeFinishedProcesses(processes)

            # wait for all processes
            while len(processes)>0:
                time.sleep(0.5)
                processes = removeFinishedProcesses(processes)
            logging.info("All processes completed")
Run Code Online (Sandbox Code Playgroud)


unh*_*ler 2

您要求的是一个线程池。可用于执行任务的线程数量是固定的。当不运行任务时,它会等待任务队列以获得要执行的新代码段。

有这个线程池模块,但是有评论说它还没有被认为是完整的。可能还有其他软件包,但这是我发现的第一个软件包。