Python:并行执行cat子进程

lia*_*ker 14 python shell subprocess python-multithreading

cat | zgrep在远程服务器上运行几个命令并单独收集它们的输出以进行进一步处理:

class MainProcessor(mp.Process):
    def __init__(self, peaks_array):
        super(MainProcessor, self).__init__()
        self.peaks_array = peaks_array

    def run(self):
        for peak_arr in self.peaks_array:
            peak_processor = PeakProcessor(peak_arr)
            peak_processor.start()

class PeakProcessor(mp.Process):
    def __init__(self, peak_arr):
        super(PeakProcessor, self).__init__()
        self.peak_arr = peak_arr

    def run(self):
        command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
        log_lines = (subprocess.check_output(command, shell=True)).split('\n')
        process_data(log_lines)
Run Code Online (Sandbox Code Playgroud)

但是,这会导致子进程('ssh ... cat ...')命令的顺序执行.第二个峰值等待第一个完成,依此类推.

如何修改此代码以便子进程调用并行运行,同时仍能够单独收集每个的输出?

jfs*_*jfs 33

您既不需要multiprocessing也不需要threading并行运行子进程,例如:

#!/usr/bin/env python
from subprocess import Popen

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
             for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]
Run Code Online (Sandbox Code Playgroud)

它同时运行5个shell命令.注意:这里既不使用线程也不使用multiprocessing模块.没有必要&在shell命令中添加&符号:Popen不等待命令完成.你需要.wait()明确地打电话.

它很方便,但没有必要使用线程来收集子进程的输出:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
                   stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
             for i in range(5)]

# collect output in parallel
def get_lines(process):
    return process.communicate()[0].splitlines()

outputs = Pool(len(processes)).map(get_lines, processes)
Run Code Online (Sandbox Code Playgroud)

相关:Python线程化多个bash子进程?.

这是在同一个线程中同时从多个子进程输出的代码示例:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

@asyncio.coroutine
def get_lines(shell_command):
    p = yield from asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (yield from p.communicate())[0].splitlines()

if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                    .format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()
Run Code Online (Sandbox Code Playgroud)

  • @SaheelGodhane:`multiprocessing.dummy.Pool()基于'的解决方案使用*multiple*(多个/多个)线程.`asyncio`解决方案在这里使用*single*线程.要了解如何在同一个线程*中同时执行多项操作*,请参阅[从头开始的Python并发:LIVE!](http://www.youtube.com/watch?v=MCs5OvhV9S4) (3认同)

Fro*_*its 0

另一种方法(而不是将 shell 进程置于后台的其他建议)是使用多线程。

您所拥有的方法run将执行如下操作:

thread.start_new_thread ( myFuncThatDoesZGrep)
Run Code Online (Sandbox Code Playgroud)

要收集结果,您可以执行以下操作:

class MyThread(threading.Thread):
   def run(self):
       self.finished = False
       # Your code to run the command here.
       blahBlah()
       # When finished....
       self.finished = True
       self.results = []
Run Code Online (Sandbox Code Playgroud)

按照上面的多线程链接中所述运行线程。当您的线程对象的 myThread.finished == True 时,您可以通过 myThread.results 收集结果。

  • 不,当您运行子进程时,您的代码会阻塞(停止运行),直到命令完成。因此,每次执行 run() 时,都会串行运行每个命令。如果您想并行执行此操作,那么线程就派上用场了——并行运行多个线程,每个线程串行运行一个命令。 (2认同)