ima*_*hat 21 python subprocess
我有以下代码将md5sums写入日志文件
for file in files_output:
p=subprocess.Popen(['md5sum',file],stdout=logfile)
p.wait()
Run Code Online (Sandbox Code Playgroud)
这些是并行写的吗?即如果md5sum对其中一个文件花费很长时间,那么在等待前一个文件完成之前是否会启动另一个文件?
如果上面的答案是肯定的,我可以假设写入日志文件的md5sums的顺序可能因md5sum对每个文件的使用时间而有所不同吗?(有些文件可能很大,有些文件很小)
dkz*_*dkz 20
此外,您p.wait()在for循环之后的制作方式将等待最后一个md5sum进程完成,其余的可能仍在运行.
但是,如果将md5sum输出收集到临时文件中并在完成所有进程后将其收回到一个文件中,则可以稍微修改此代码,以便仍然具有并行处理和同步输出可预测性的优势.
import subprocess
import os
processes = []
for file in files_output:
f = os.tmpfile()
p = subprocess.Popen(['md5sum',file],stdout=f)
processes.append((p, f))
for p, f in processes:
p.wait()
f.seek(0)
logfile.write(f.read())
f.close()
Run Code Online (Sandbox Code Playgroud)
Alf*_*lfe 18
所有子流程并行运行.(为了避免这一点,必须明确等待它们的完成.)它们甚至可以同时写入日志文件,从而使输出变得混乱.为避免这种情况,您应该让每个进程写入不同的日志文件,并在所有进程完成后收集所有输出.
q = Queue.Queue()
result = {} # used to store the results
for fileName in fileNames:
q.put(fileName)
def worker():
while True:
fileName = q.get()
if fileName is None: # EOF?
return
subprocess_stuff_using(fileName)
wait_for_finishing_subprocess()
checksum = collect_md5_result_for(fileName)
result[fileName] = checksum # store it
threads = [ threading.Thread(target=worker) for _i in range(20) ]
for thread in threads:
thread.start()
q.put(None) # one EOF marker for each thread
Run Code Online (Sandbox Code Playgroud)
在此之后,结果应存储在result.
从并行md5sum子进程收集输出的一种简单方法是使用线程池并从主进程写入文件:
from multiprocessing.dummy import Pool # use threads
from subprocess import check_output
def md5sum(filename):
try:
return check_output(["md5sum", filename]), None
except Exception as e:
return None, e
if __name__ == "__main__":
p = Pool(number_of_processes) # specify number of concurrent processes
with open("md5sums.txt", "wb") as logfile:
for output, error in p.imap(md5sum, filenames): # provide filenames
if error is None:
logfile.write(output)
Run Code Online (Sandbox Code Playgroud)
md5sum很小,因此您可以将其存储在内存中imap 保留订单number_of_processes 可能与文件或CPU内核的数量不同(较大的值并不意味着更快:它取决于IO(磁盘)和CPU的相对性能)您可以尝试一次将多个文件传递给md5sum子进程.
在这种情况下,您不需要外部子进程; 你可以用Python计算md5:
import hashlib
from functools import partial
def md5sum(filename, chunksize=2**15, bufsize=-1):
m = hashlib.md5()
with open(filename, 'rb', bufsize) as f:
for chunk in iter(partial(f.read, chunksize), b''):
m.update(chunk)
return m.hexdigest()
Run Code Online (Sandbox Code Playgroud)
要使用多个进程而不是线程(允许纯Python md5sum()使用多个CPU并行运行),只需.dummy从上面的代码中导入即可.