Python:并行运行子进程

ima*_*hat 21 python subprocess

我有以下代码将md5sums写入日志文件

for file in files_output:
    p=subprocess.Popen(['md5sum',file],stdout=logfile)
p.wait()
Run Code Online (Sandbox Code Playgroud)
  1. 这些是并行写的吗?即如果md5sum对其中一个文件花费很长时间,那么在等待前一个文件完成之前是否会启动另一个文件?

  2. 如果上面的答案是肯定的,我可以假设写入日志文件的md5sums的顺序可能因md5sum对每个文件的使用时间而有所不同吗?(有些文件可能很大,有些文件很小)

dkz*_*dkz 20

  1. 是的,这些md5sum进程将并行启动.
  2. 是的,md5sums写入的顺序将是不可预测的.通常,以这种方式从许多进程共享像文件这样的单个资源被认为是一种不好的做法.

此外,您p.wait()for循环之后的制作方式将等待最后一个md5sum进程完成,其余的可能仍在运行.

但是,如果将md5sum输出收集到临时文件中并在完成所有进程后将其收回到一个文件中,则可以稍微修改此代码,以便仍然具有并行处理和同步输出可预测性的优势.

import subprocess
import os

processes = []
for file in files_output:
    f = os.tmpfile()
    p = subprocess.Popen(['md5sum',file],stdout=f)
    processes.append((p, f))

for p, f in processes:
    p.wait()
    f.seek(0)
    logfile.write(f.read())
    f.close()
Run Code Online (Sandbox Code Playgroud)

  • 是的,`processes []`将保留`files_output []`的原始顺序,并确保每个md5sum进程都完成.但是如果你关心操作系统的资源,你应该考虑使用任务队列的线程池和同步md5sum在每个线程中运行`subprocess.check_output()`作为@Alfe提议. (2认同)

Alf*_*lfe 18

所有子流程并行运行.(为了避免这一点,必须明确等待它们的完成.)它们甚至可以同时写入日志文件,从而使输出变得混乱.为避免这种情况,您应该让每个进程写入不同的日志文件,并在所有进程完成后收集所有输出.

q = Queue.Queue()
result = {}  # used to store the results
for fileName in fileNames:
  q.put(fileName)

def worker():
  while True:
    fileName = q.get()
    if fileName is None:  # EOF?
      return
    subprocess_stuff_using(fileName)
    wait_for_finishing_subprocess()
    checksum = collect_md5_result_for(fileName)
    result[fileName] = checksum  # store it

threads = [ threading.Thread(target=worker) for _i in range(20) ]
for thread in threads:
  thread.start()
  q.put(None)  # one EOF marker for each thread
Run Code Online (Sandbox Code Playgroud)

在此之后,结果应存储在result.

  • 不,你不应该.创建一个`Queue.Queue`和几十个线程的线程池,让每个线程从队列中读取一个元素并为该元素启动一个子进程,等待这个子进程的完成,得到结果(md5校验和) ,将结果存储在映射中.如果队列为空,则线程应终止. (2认同)
  • 如果不直接查看您的代码,我不知道。代码当前声明如下:将所有任务(按原始顺序)放入队列中,并告诉 20 个工作人员每个执行以下操作:从队列中取出一个任务并处理它,继续,直到从队列中获得 EOF(无)。因为工人是并行工作的,这当然意味着最后完成第一个任务(第二十个任务)的工人可以第一个完成他的任务。这将更改结果到达的顺序。但这取决于任务所需的时间。 (2认同)

jfs*_*jfs 7

从并行md5sum子进程收集输出的一种简单方法是使用线程池并从主进程写入文件:

from multiprocessing.dummy import Pool # use threads
from subprocess import check_output

def md5sum(filename):
    try:
        return check_output(["md5sum", filename]), None
    except Exception as e:
        return None, e

if __name__ == "__main__":
    p = Pool(number_of_processes) # specify number of concurrent processes
    with open("md5sums.txt", "wb") as logfile:
        for output, error in p.imap(md5sum, filenames): # provide filenames
            if error is None:
               logfile.write(output)
Run Code Online (Sandbox Code Playgroud)
  • 输出md5sum很小,因此您可以将其存储在内存中
  • imap 保留订单
  • number_of_processes 可能与文件或CPU内核的数量不同(较大的值并不意味着更快:它取决于IO(磁盘)和CPU的相对性能)

您可以尝试一次将多个文件传递给md5sum子进程.

在这种情况下,您不需要外部子进程; 你可以用Python计算md5:

import hashlib
from functools import partial

def md5sum(filename, chunksize=2**15, bufsize=-1):
    m = hashlib.md5()
    with open(filename, 'rb', bufsize) as f:
        for chunk in iter(partial(f.read, chunksize), b''):
            m.update(chunk)
    return m.hexdigest()
Run Code Online (Sandbox Code Playgroud)

要使用多个进程而不是线程(允许纯Python md5sum()使用多个CPU并行运行),只需.dummy从上面的代码中导入即可.