使用Python的Multiprocessing模块执行同时和单独的SEAWAT/MODFLOW模型运行

Jas*_*ino 17 python multiprocessing

我正在尝试在我的8处理器64位Windows 7机器上完成100个模型运行.我想同时运行7个模型实例以减少我的总运行时间(每个模型运行大约9.5分钟).我查看了几个与Python的Multiprocessing模块有关的线程,但我仍然遗漏了一些东西.

使用多处理模块

如何在多处理器系统上生成并行子进程?

Python多处理队列

我的过程:

我有100个不同的参数集,我想通过SEAWAT/MODFLOW来比较结果.我为每个模型运行预先构建了模型输入文件,并将它们存储在自己的目录中.我希望能够做的是一次运行7个模型,直到完成所有实现.不需要在进程之间进行通信或显示结果.到目前为止,我只能按顺序生成模型:

import os,subprocess
import multiprocessing as mp

ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
files = []
for f in os.listdir(ws + r'\fieldgen\reals'):
    if f.endswith('.npy'):
        files.append(f)

## def work(cmd):
##     return subprocess.call(cmd, shell=False)

def run(f,def_param=ws):
    real = f.split('_')[2].split('.')[0]
    print 'Realization %s' % real

    mf2k = r'c:\modflow\mf2k.1_19\bin\mf2k.exe '
    mf2k5 = r'c:\modflow\MF2005_1_8\bin\mf2005.exe '
    seawatV4 = r'c:\modflow\swt_v4_00_04\exe\swt_v4.exe '
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '

    exe = seawatV4x64
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real

    os.system( exe + swt_nam )


if __name__ == '__main__':
    p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
    tasks = range(len(files))
    results = []
    for f in files:
        r = p.map_async(run(f), tasks, callback=results.append)
Run Code Online (Sandbox Code Playgroud)

我改变了if __name__ == 'main':以下内容,希望它能解决缺乏并行性的问题,我觉得上面的脚本被传递给了for loop.但是,模型甚至无法运行(没有Python错误):

if __name__ == '__main__':
    p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
    p.map_async(run,((files[f],) for f in range(len(files))))
Run Code Online (Sandbox Code Playgroud)

非常感谢任何和所有的帮助!

编辑3/26/2012 13:31 EST

使用@JF中的"手动池"方法Sebastian的回答我得到了外部.exe的并行执行.模型实现一次只能批量调用8个,但是在调用下一个批处理之前它不会等待那8个运行完成,依此类推:

from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread

def run(f,ws):
    real = f.split('_')[-1].split('.')[0]
    print('Realization %s' % real)
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
    subprocess.check_call([seawatV4x64, swt_nam])

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\reals')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()

    for _ in threads: q.put_nowait(None) # signal no more files
    for t in threads: t.join() # wait for completion

if __name__ == '__main__':

    mp.freeze_support() # optional if the program is not frozen
    main()
Run Code Online (Sandbox Code Playgroud)

没有错误回溯可用.run()当调用单个模型实现文件时,该函数执行其职责,就像多个文件一样.唯一的区别是,对于多个文件,len(files)虽然每个实例都会立即关闭,但只允许一个模型运行完成,然后脚本会正常退出(退出代码0).

加入一些打印语句main()揭示了积极的线程数的一些信息以及线程状态(注意,这是一个测试仅8实现文件,使截图更便于管理,理论上所有的8个文件应同时运行,但行为继续产生,并立即死亡除了一个):

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\test')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()
    print('Active Count a',threading.activeCount())
    for _ in threads:
        print(_)
        q.put_nowait(None) # signal no more files
    for t in threads: 
        print(t)
        t.join() # wait for completion
    print('Active Count b',threading.activeCount())
Run Code Online (Sandbox Code Playgroud)

截图

**读取" D:\\Data\\Users..." 的行是当我手动停止模型运行到完成时抛出的错误信息.一旦我停止模型运行,就会报告剩余的线程状态行并退出脚本.

编辑3/26/2012 16:24 EST

SEAWAT确实允许并发执行,因为我过去已经这样做了,使用iPython手动生成实例并从每个模型文件文件夹启动.这一次,我将从一个位置启动所有模型运​​行,即我的脚本所在的目录.看起来罪魁祸首可能是SEAWAT节省一些输出的方式.运行SEAWAT时,它会立即创建与模型运行相关的文件.其中一个文件未保存到模型实现所在的目录中,而是保存在脚本所在的顶级目录中.这可以防止任何后续线程在同一位置保存相同的文件名(他们都希望这样做,因为这些文件名是通用的,对每个实现都是非特定的).该SEAWAT窗户都没有停留足够长的时间给我读,甚至看到有一个错误邮件打开,我才意识到这个的时候我回去,并试图运行使用IPython的直接显示来自SEAWAT打印输出而不是打开的代码运行程序的新窗口.

我接受了@JF Sebastian的回答,因为一旦我解决了这个模型可执行问题,他提供的线程代码就会让我得到我需要的位置.

最终代码

在subprocess.check_call中添加了cwd参数,以在其自己的目录中启动SEAWAT的每个实例.很关键.

from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
import threading

def run(f,ws):
    real = f.split('_')[-1].split('.')[0]
    print('Realization %s' % real)
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
    cwd = ws + r'\reals\real%s\ss' % real
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
    subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd)

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\reals')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()
    for _ in threads: q.put_nowait(None) # signal no more files
    for t in threads: t.join() # wait for completion

if __name__ == '__main__':
    mp.freeze_support() # optional if the program is not frozen
    main()
Run Code Online (Sandbox Code Playgroud)

jfs*_*jfs 16

我没有在Python代码中看到任何计算.如果您只需要并行执行多个外部程序,则可以使用subprocess运行程序和threading模块来维持正常运行的进程数,但最简单的代码是使用multiprocessing.Pool:

#!/usr/bin/env python
import os
import multiprocessing as mp

def run(filename_def_param): 
    filename, def_param = filename_def_param # unpack arguments
    ... # call external program on `filename`

def safe_run(*args, **kwargs):
    """Call run(), catch exceptions."""
    try: run(*args, **kwargs)
    except Exception as e:
        print("error: %s run(*%r, **%r)" % (e, args, kwargs))

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    workdir = os.path.join(ws, r'fieldgen\reals')
    files = ((os.path.join(workdir, f), ws)
             for f in os.listdir(workdir) if f.endswith('.npy'))

    # start processes
    pool = mp.Pool() # use all available CPUs
    pool.map(safe_run, files)

if __name__=="__main__":
    mp.freeze_support() # optional if the program is not frozen
    main()
Run Code Online (Sandbox Code Playgroud)

如果有很多文件,则pool.map()可以替换为for _ in pool.imap_unordered(safe_run, files): pass.

mutiprocessing.dummy.Pool 提供了相同的接口,multiprocessing.Pool但使用线程而不是在这种情况下可能更合适的进程.

您不需要保留一些CPU空闲.只需使用以低优先级启动可执行文件的命令(在Linux上它是一个nice程序).

ThreadPoolExecutor

concurrent.futures.ThreadPoolExecutor既简单又充足,但它需要Python 2.x的第三方依赖(自Python 3.2以来它在stdlib中).

#!/usr/bin/env python
import os
import concurrent.futures

def run(filename, def_param):
    ... # call external program on `filename`

# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith('.npy'))

# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    future_to_file = dict((executor.submit(run, f, ws), f) for f in files)

    for future in concurrent.futures.as_completed(future_to_file):
        f = future_to_file[future]
        if future.exception() is not None:
           print('%r generated an exception: %s' % (f, future.exception()))
        # run() doesn't return anything so `future.result()` is always `None`
Run Code Online (Sandbox Code Playgroud)

或者如果我们忽略由run()以下引起的异常

from itertools import repeat

... # the same

# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
     executor.map(run, files, repeat(ws))
     # run() doesn't return anything so `map()` results can be ignored
Run Code Online (Sandbox Code Playgroud)

subprocess+ threading(手动池)解决方案

#!/usr/bin/env python
from __future__ import print_function
import os
import subprocess
import sys
from Queue import Queue
from threading import Thread

def run(filename, def_param):
    ... # define exe, swt_nam
    subprocess.check_call([exe, swt_nam]) # run external program

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

# start threads
q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
    t.daemon = True # threads die if the program dies
    t.start()

# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
for f in os.listdir(wdir):
    if f.endswith('.npy'):
        q.put_nowait((os.path.join(wdir, f), ws))

for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
Run Code Online (Sandbox Code Playgroud)