我编写了一个简单的脚本,对一系列文件执行系统命令.为了加快速度,我想并行运行它们,但不是一次全部运行 - 我需要控制同时运行的命令的最大数量.什么是最简单的方法来解决这个问题?
Sve*_*ach 26
如果你正在调用子进程,我认为不需要使用线程池.使用该subprocess模块的基本实现将是
import subprocess
import os
import time
files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5
for name in files:
processes.add(subprocess.Popen([command, name]))
if len(processes) >= max_processes:
os.wait()
processes.difference_update([
p for p in processes if p.poll() is not None])
Run Code Online (Sandbox Code Playgroud)
在Windows上,os.wait()不可用(也没有任何其他等待任何子进程终止的方法).您可以通过在特定时间间隔内轮询来解决此问题:
for name in files:
processes.add(subprocess.Popen([command, name]))
while len(processes) >= max_processes:
time.sleep(.1)
processes.difference_update([
p for p in processes if p.poll() is not None])
Run Code Online (Sandbox Code Playgroud)
睡眠时间取决于子过程的预期执行时间.
Thu*_*ner 16
Sven Marnach的回答几乎是对的,但是有一个问题.如果最后一个max_processes进程结束,主程序将尝试启动另一个进程,for循环将结束.这将关闭主进程,进而可以关闭子进程.对我来说,这种行为发生在screen命令中.
Linux中的代码将是这样的(并且只适用于python2.7):
import subprocess
import os
import time
files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5
for name in files:
processes.add(subprocess.Popen([command, name]))
if len(processes) >= max_processes:
os.wait()
processes.difference_update(
[p for p in processes if p.poll() is not None])
#Check if all the child processes were closed
for p in processes:
if p.poll() is None:
p.wait()
Run Code Online (Sandbox Code Playgroud)
您需要将Semaphore对象与线程结合在一起。信号量是一个对象,可让您限制在给定代码段中运行的线程数。在这种情况下,我们将使用信号量来限制可以运行os.system调用的线程数。
首先,我们导入所需的模块:
#!/usr/bin/python
import threading
import os
Run Code Online (Sandbox Code Playgroud)
接下来,我们创建一个信号量对象。这里的数字四是一次可以获取信号量的线程数。这限制了可以一次运行的子流程的数量。
semaphore = threading.Semaphore(4)
Run Code Online (Sandbox Code Playgroud)
该函数只是将对子过程的调用包装在对信号量的调用中。
def run_command(cmd):
semaphore.acquire()
try:
os.system(cmd)
finally:
semaphore.release()
Run Code Online (Sandbox Code Playgroud)
如果您使用的是Python 2.6+,则可以变得更加简单,因为您可以使用“ with”语句来执行Acquisition和Release调用。
def run_command(cmd):
with semaphore:
os.system(cmd)
Run Code Online (Sandbox Code Playgroud)
最后,为了表明它能按预期工作,我们将调用“ sleep 10”命令八次。
for i in range(8):
threading.Thread(target=run_command, args=("sleep 10", )).start()
Run Code Online (Sandbox Code Playgroud)
使用“时间”程序运行脚本显示,由于并行运行了四个睡眠中的两个,因此只需要20秒。
aw@aw-laptop:~/personal/stackoverflow$ time python 4992400.py
real 0m20.032s
user 0m0.020s
sys 0m0.008s
Run Code Online (Sandbox Code Playgroud)
我将Sven和Thuener的解决方案合并为一个解决方案,该解决方案等待跟踪进程,并在其中一个进程崩溃时也停止运行:
def removeFinishedProcesses(processes):
""" given a list of (commandString, process),
remove those that have completed and return the result
"""
newProcs = []
for pollCmd, pollProc in processes:
retCode = pollProc.poll()
if retCode==None:
# still running
newProcs.append((pollCmd, pollProc))
elif retCode!=0:
# failed
raise Exception("Command %s failed" % pollCmd)
else:
logging.info("Command %s completed successfully" % pollCmd)
return newProcs
def runCommands(commands, maxCpu):
processes = []
for command in commands:
logging.info("Starting process %s" % command)
proc = subprocess.Popen(shlex.split(command))
procTuple = (command, proc)
processes.append(procTuple)
while len(processes) >= maxCpu:
time.sleep(.2)
processes = removeFinishedProcesses(processes)
# wait for all processes
while len(processes)>0:
time.sleep(0.5)
processes = removeFinishedProcesses(processes)
logging.info("All processes completed")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
33502 次 |
| 最近记录: |