Ven*_*tta 13 python parallel-processing subprocess multiprocessing
我理解使用subprocess是调用外部命令的首选方式.
但是如果我想在parall中运行几个命令,但是限制生成的进程数呢?困扰我的是我无法阻止子进程.例如,如果我打电话
subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile)
Run Code Online (Sandbox Code Playgroud)
然后该过程将继续,无需等待cmd完成.因此,我无法将其包装在multiprocessing图书馆的工作人员中.
例如,如果我这样做:
def worker(cmd):
subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
pool = Pool( processes = 10 );
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list];
ans = [res.get() for res in results];
Run Code Online (Sandbox Code Playgroud)
然后每个工人将在产生子流程后完成并返回.所以我无法真正限制subprocess使用生成的进程数Pool.
什么是限制子过程数量的正确方法?
jfs*_*jfs 13
您不需要多个Python进程甚至线程来限制并行子进程的最大数量:
from itertools import izip_longest
from subprocess import Popen, STDOUT
groups = [(Popen(cmd, stdout=outputfile, stderr=STDOUT)
for cmd in commands)] * limit # itertools' grouper recipe
for processes in izip_longest(*groups): # run len(processes) == limit at a time
for p in filter(None, processes):
p.wait()
Run Code Online (Sandbox Code Playgroud)
请参阅Python中的chunks(of n)迭代迭代器?
如果要限制并行子进程的最大和最小数量,可以使用线程池:
from multiprocessing.pool import ThreadPool
from subprocess import STDOUT, call
def run(cmd):
return cmd, call(cmd, stdout=outputfile, stderr=STDOUT)
for cmd, rc in ThreadPool(limit).imap_unordered(run, commands):
if rc != 0:
print('{cmd} failed with exit status: {rc}'.format(**vars()))
Run Code Online (Sandbox Code Playgroud)
只要任何子limit进程结束,就会启动一个新的子进程以始终保持limit子进程数.
from concurrent.futures import ThreadPoolExecutor # pip install futures
from subprocess import STDOUT, call
with ThreadPoolExecutor(max_workers=limit) as executor:
for cmd in commands:
executor.submit(call, cmd, stdout=outputfile, stderr=STDOUT)
Run Code Online (Sandbox Code Playgroud)
这是一个简单的线程池实现:
import subprocess
from threading import Thread
try: from queue import Queue
except ImportError:
from Queue import Queue # Python 2.x
def worker(queue):
for cmd in iter(queue.get, None):
subprocess.check_call(cmd, stdout=outputfile, stderr=subprocess.STDOUT)
q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(limit)]
for t in threads: # start workers
t.daemon = True
t.start()
for cmd in commands: # feed commands to threads
q.put_nowait(cmd)
for _ in threads: q.put(None) # signal no more commands
for t in threads: t.join() # wait for completion
Run Code Online (Sandbox Code Playgroud)
为避免过早退出,请添加异常处理.
如果要捕获字符串中的子进程输出,请参阅Python:并行执行cat子进程.
subprocess.call如果要等待命令完成,可以使用.有关pydoc subprocess更多信息,请参阅
您也可以Popen.wait在您的worker中调用该方法:
def worker(cmd):
p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
p.wait()
Run Code Online (Sandbox Code Playgroud)