Jas*_*ino 17 python multiprocessing
我正在尝试在我的8处理器64位Windows 7机器上完成100个模型运行.我想同时运行7个模型实例以减少我的总运行时间(每个模型运行大约9.5分钟).我查看了几个与Python的Multiprocessing模块有关的线程,但我仍然遗漏了一些东西.
我的过程:
我有100个不同的参数集,我想通过SEAWAT/MODFLOW来比较结果.我为每个模型运行预先构建了模型输入文件,并将它们存储在自己的目录中.我希望能够做的是一次运行7个模型,直到完成所有实现.不需要在进程之间进行通信或显示结果.到目前为止,我只能按顺序生成模型:
import os,subprocess
import multiprocessing as mp
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
files = []
for f in os.listdir(ws + r'\fieldgen\reals'):
if f.endswith('.npy'):
files.append(f)
## def work(cmd):
## return subprocess.call(cmd, shell=False)
def run(f,def_param=ws):
real = f.split('_')[2].split('.')[0]
print 'Realization %s' % real
mf2k = r'c:\modflow\mf2k.1_19\bin\mf2k.exe '
mf2k5 = r'c:\modflow\MF2005_1_8\bin\mf2005.exe '
seawatV4 = r'c:\modflow\swt_v4_00_04\exe\swt_v4.exe '
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
exe = seawatV4x64
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
os.system( exe + swt_nam )
if __name__ == '__main__':
p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
tasks = range(len(files))
results = []
for f in files:
r = p.map_async(run(f), tasks, callback=results.append)
Run Code Online (Sandbox Code Playgroud)
我改变了if __name__ == 'main':以下内容,希望它能解决缺乏并行性的问题,我觉得上面的脚本被传递给了for loop.但是,模型甚至无法运行(没有Python错误):
if __name__ == '__main__':
p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
p.map_async(run,((files[f],) for f in range(len(files))))
Run Code Online (Sandbox Code Playgroud)
非常感谢任何和所有的帮助!
编辑3/26/2012 13:31 EST
使用@JF中的"手动池"方法Sebastian的回答我得到了外部.exe的并行执行.模型实现一次只能批量调用8个,但是在调用下一个批处理之前它不会等待那8个运行完成,依此类推:
from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
def run(f,ws):
real = f.split('_')[-1].split('.')[0]
print('Realization %s' % real)
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
subprocess.check_call([seawatV4x64, swt_nam])
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
if __name__ == '__main__':
mp.freeze_support() # optional if the program is not frozen
main()
Run Code Online (Sandbox Code Playgroud)
没有错误回溯可用.run()当调用单个模型实现文件时,该函数执行其职责,就像多个文件一样.唯一的区别是,对于多个文件,len(files)虽然每个实例都会立即关闭,但只允许一个模型运行完成,然后脚本会正常退出(退出代码0).
加入一些打印语句main()揭示了积极的线程数的一些信息以及线程状态(注意,这是一个测试仅8实现文件,使截图更便于管理,理论上所有的8个文件应同时运行,但行为继续产生,并立即死亡除了一个):
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\test')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
print('Active Count a',threading.activeCount())
for _ in threads:
print(_)
q.put_nowait(None) # signal no more files
for t in threads:
print(t)
t.join() # wait for completion
print('Active Count b',threading.activeCount())
Run Code Online (Sandbox Code Playgroud)

**读取" D:\\Data\\Users..." 的行是当我手动停止模型运行到完成时抛出的错误信息.一旦我停止模型运行,就会报告剩余的线程状态行并退出脚本.
编辑3/26/2012 16:24 EST
SEAWAT确实允许并发执行,因为我过去已经这样做了,使用iPython手动生成实例并从每个模型文件文件夹启动.这一次,我将从一个位置启动所有模型运行,即我的脚本所在的目录.看起来罪魁祸首可能是SEAWAT节省一些输出的方式.运行SEAWAT时,它会立即创建与模型运行相关的文件.其中一个文件未保存到模型实现所在的目录中,而是保存在脚本所在的顶级目录中.这可以防止任何后续线程在同一位置保存相同的文件名(他们都希望这样做,因为这些文件名是通用的,对每个实现都是非特定的).该SEAWAT窗户都没有停留足够长的时间给我读,甚至看到有一个错误邮件打开,我才意识到这个的时候我回去,并试图运行使用IPython的直接显示来自SEAWAT打印输出而不是打开的代码运行程序的新窗口.
我接受了@JF Sebastian的回答,因为一旦我解决了这个模型可执行问题,他提供的线程代码就会让我得到我需要的位置.
最终代码
在subprocess.check_call中添加了cwd参数,以在其自己的目录中启动SEAWAT的每个实例.很关键.
from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
import threading
def run(f,ws):
real = f.split('_')[-1].split('.')[0]
print('Realization %s' % real)
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
cwd = ws + r'\reals\real%s\ss' % real
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd)
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
if __name__ == '__main__':
mp.freeze_support() # optional if the program is not frozen
main()
Run Code Online (Sandbox Code Playgroud)
jfs*_*jfs 16
我没有在Python代码中看到任何计算.如果您只需要并行执行多个外部程序,则可以使用subprocess运行程序和threading模块来维持正常运行的进程数,但最简单的代码是使用multiprocessing.Pool:
#!/usr/bin/env python
import os
import multiprocessing as mp
def run(filename_def_param):
filename, def_param = filename_def_param # unpack arguments
... # call external program on `filename`
def safe_run(*args, **kwargs):
"""Call run(), catch exceptions."""
try: run(*args, **kwargs)
except Exception as e:
print("error: %s run(*%r, **%r)" % (e, args, kwargs))
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
workdir = os.path.join(ws, r'fieldgen\reals')
files = ((os.path.join(workdir, f), ws)
for f in os.listdir(workdir) if f.endswith('.npy'))
# start processes
pool = mp.Pool() # use all available CPUs
pool.map(safe_run, files)
if __name__=="__main__":
mp.freeze_support() # optional if the program is not frozen
main()
Run Code Online (Sandbox Code Playgroud)
如果有很多文件,则pool.map()可以替换为for _ in pool.imap_unordered(safe_run, files): pass.
还mutiprocessing.dummy.Pool 提供了相同的接口,multiprocessing.Pool但使用线程而不是在这种情况下可能更合适的进程.
您不需要保留一些CPU空闲.只需使用以低优先级启动可执行文件的命令(在Linux上它是一个nice程序).
ThreadPoolExecutor 例concurrent.futures.ThreadPoolExecutor既简单又充足,但它需要Python 2.x的第三方依赖(自Python 3.2以来它在stdlib中).
#!/usr/bin/env python
import os
import concurrent.futures
def run(filename, def_param):
... # call external program on `filename`
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith('.npy'))
# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
future_to_file = dict((executor.submit(run, f, ws), f) for f in files)
for future in concurrent.futures.as_completed(future_to_file):
f = future_to_file[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (f, future.exception()))
# run() doesn't return anything so `future.result()` is always `None`
Run Code Online (Sandbox Code Playgroud)
或者如果我们忽略由run()以下引起的异常
from itertools import repeat
... # the same
# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(run, files, repeat(ws))
# run() doesn't return anything so `map()` results can be ignored
Run Code Online (Sandbox Code Playgroud)
subprocess+ threading(手动池)解决方案#!/usr/bin/env python
from __future__ import print_function
import os
import subprocess
import sys
from Queue import Queue
from threading import Thread
def run(filename, def_param):
... # define exe, swt_nam
subprocess.check_call([exe, swt_nam]) # run external program
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
# start threads
q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
Run Code Online (Sandbox Code Playgroud)