只要一个完成并在队列中启动下一个作业,Python多个子进程就会有一个池/队列恢复输出

gma*_*rco 13 python queue parallel-processing subprocess stdout

我正在启动一个子进程并在运行时解析stdout而不必等待它完成解析stdout.

for sample in all_samples:
    my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        #here I parse stdout..
Run Code Online (Sandbox Code Playgroud)

在我的脚本中,我多次执行此操作,实际上取决于输入样本的数量.

这里的主要问题是每个子进程都是一个程序/工具,它在运行时使用1个CPU 100%.这需要一些时间......每次输入可能需要20-40分钟.

我想什么来实现的,是设置一个游泳池,队列(我不知道这是怎么确切的术语)在同时运行N最大的子进程的工作过程.所以我可以最大化性能,而不是按顺序进行.

因此,执行流程,例如最多4个作业池应该是:

  • 启动4个子流程.
  • 当其中一个作业完成后,解析stdout并启动下一个.
  • 执行此操作直到队列中的所有作业完成.

如果我能实现这一点,我真的不知道如何识别哪个样本子流程已完成.此时,我不需要识别它们,因为每个子进程按顺序运行,并且我解析stdout,因为子进程正在打印stdout.

这非常重要,因为我需要识别每个子流程的输出并将其分配给它的相应输入/样本.

GP8*_*P89 17

ThreadPool 可能非常适合您的问题,您可以设置工作线程数并添加作业,并且线程将完成所有任务.

from multiprocessing.pool import ThreadPool
import subprocess


def work(sample):
    my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        #here I parse stdout..


num = None  # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
for sample in all_samples:
    tp.apply_async(work, (sample,))

tp.close()
tp.join()
Run Code Online (Sandbox Code Playgroud)