Python 多处理 - 是否可以在单个进程之间引入固定的时间延迟?

kat*_*tsu 6 python multithreading batch-file multiprocessing

我已经搜索过,但在其他地方找不到这个问题的答案。希望我没有错过任何东西。

我正在尝试使用 Python 多处理来并行批量运行一些专有模型。比如说,我有 200 个模拟,我想一次批量运行大约 10-20 个。我的问题是,如果两个模型碰巧同时/相似的时间启动,专有软件就会崩溃。我需要在多处理产生的进程之间引入延迟,以便每个新模型运行在开始之前等待一点。

到目前为止,我的解决方案是在子进程启动模型运行之前引入随机时间延迟。但是,这只会降低任意两次运行同时开始的概率,因此我在尝试处理大量模型时仍然会遇到问题。因此,我认为需要将时间延迟内置到代码的多处理部分中,但我找不到任何文档或示例。

编辑:我使用的是 Python 2.7

到目前为止,这是我的代码:

from time import sleep
import numpy as np
import subprocess
import multiprocessing

def runmodels(arg):
    sleep(np.random.rand(1,1)*120) # this is my interim solution to reduce the probability that any two runs start at the same time, but it isn't a guaranteed solution
    subprocess.call(arg) # this line actually fires off the model run

if __name__ == '__main__':    

    arguments =     [big list of runs in here
                    ]    

    count = 12
    pool = multiprocessing.Pool(processes = count)
    r = pool.imap_unordered(runmodels, arguments)      
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

jfs*_*jfs 6

multiprocessing.Pool()已经限制了并发运行的进程数。

您可以使用锁来分隔进程的启动时间(未测试):

import threading
import multiprocessing

def init(lock):
    global starting
    starting = lock

def run_model(arg):
    starting.acquire() # no other process can get it until it is released
    threading.Timer(1, starting.release).start() # release in a second
    # ... start your simulation here

if __name__=="__main__":
   arguments = ...
   pool = Pool(processes=12, 
               initializer=init, initargs=[multiprocessing.Lock()])
   for _ in pool.imap_unordered(run_model, arguments):
       pass
Run Code Online (Sandbox Code Playgroud)


Que*_*and 1

使用线程和信号量执行此操作的一种方法:

from time import sleep
import subprocess
import threading


def runmodels(arg):
    subprocess.call(arg)
    sGlobal.release() # release for next launch


if __name__ == '__main__':
    threads = []
    global sGlobal
    sGlobal = threading.Semaphore(12) #Semaphore for max 12 Thread
    arguments =  [big list of runs in here
                ]
    for arg in arguments :
        sGlobal.acquire() # Block if more than 12 thread
        t = threading.Thread(target=runmodels, args=(arg,))
        threads.append(t)
        t.start()
        sleep(1)

    for t in threads :
        t.join()
Run Code Online (Sandbox Code Playgroud)