kat*_*tsu 6 python multithreading batch-file multiprocessing
我已经搜索过,但在其他地方找不到这个问题的答案。希望我没有错过任何东西。
我正在尝试使用 Python 多处理来并行批量运行一些专有模型。比如说,我有 200 个模拟,我想一次批量运行大约 10-20 个。我的问题是,如果两个模型碰巧同时/相似的时间启动,专有软件就会崩溃。我需要在多处理产生的进程之间引入延迟,以便每个新模型运行在开始之前等待一点。
到目前为止,我的解决方案是在子进程启动模型运行之前引入随机时间延迟。但是,这只会降低任意两次运行同时开始的概率,因此我在尝试处理大量模型时仍然会遇到问题。因此,我认为需要将时间延迟内置到代码的多处理部分中,但我找不到任何文档或示例。
编辑:我使用的是 Python 2.7
到目前为止,这是我的代码:
from time import sleep
import numpy as np
import subprocess
import multiprocessing
def runmodels(arg):
sleep(np.random.rand(1,1)*120) # this is my interim solution to reduce the probability that any two runs start at the same time, but it isn't a guaranteed solution
subprocess.call(arg) # this line actually fires off the model run
if __name__ == '__main__':
arguments = [big list of runs in here
]
count = 12
pool = multiprocessing.Pool(processes = count)
r = pool.imap_unordered(runmodels, arguments)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
multiprocessing.Pool()已经限制了并发运行的进程数。
您可以使用锁来分隔进程的启动时间(未测试):
import threading
import multiprocessing
def init(lock):
global starting
starting = lock
def run_model(arg):
starting.acquire() # no other process can get it until it is released
threading.Timer(1, starting.release).start() # release in a second
# ... start your simulation here
if __name__=="__main__":
arguments = ...
pool = Pool(processes=12,
initializer=init, initargs=[multiprocessing.Lock()])
for _ in pool.imap_unordered(run_model, arguments):
pass
Run Code Online (Sandbox Code Playgroud)
使用线程和信号量执行此操作的一种方法:
from time import sleep
import subprocess
import threading
def runmodels(arg):
subprocess.call(arg)
sGlobal.release() # release for next launch
if __name__ == '__main__':
threads = []
global sGlobal
sGlobal = threading.Semaphore(12) #Semaphore for max 12 Thread
arguments = [big list of runs in here
]
for arg in arguments :
sGlobal.acquire() # Block if more than 12 thread
t = threading.Thread(target=runmodels, args=(arg,))
threads.append(t)
t.start()
sleep(1)
for t in threads :
t.join()
Run Code Online (Sandbox Code Playgroud)