使用多处理时对池映射操作设置时间限制吗?

Jia*_* Lu 5 python multiprocessing

pool map()在Python中使用时是否可以设置操作的时间限制multiprocessing。当达到时间限制时,所有子进程都会停止并返回它们已有的结果。

import multiprocessing as mp

def task(v):
    do something
    return result

if __name__ == '__main__':
    vs = [...]
    p= mp.Pool()
    results = p.map(task, vs)
Run Code Online (Sandbox Code Playgroud)

在上面的例子中,我有一个非常大的列表vs。理想情况下,列表中的所有元素都vs将被发送到 function task(),并且所有结果都将保存在resultslist 中。

然而,由于列表vs非常大,而且我执行此过程的时间有限(比如 5 分钟)。我需要的是在达到 5 分钟时停止该map过程,并将计算结果返回到 list results

编辑1:

我不会终止需要超过 5 分钟才能完成的任务。假设我的列表中有 1000 个任务vs,5 分钟后只完成了 600 个任务。我需要的是杀死所有子进程,并将这 600 个任务的结果保存到列表中results

Boo*_*boo 4

我查看了ForceBru 提到的答案,它使用了名为Pebble. 首先,我不明白有关“Python标准池不支持超时”的评论。它的作用方式是,您可以等待指定的时间来返回结果,并通过异常通知它是否已返回,或者您可以只对指定超时的结果对象发出等待。在这种情况下不会返回异常,但您可以测试处理结果的“作业”是否已完成。然而,您确实无法终止单个超时作业。但是,当您处理完所有未超时的结果后,您可以调用terminate池本身,这将终止池中的所有进程,无论它们是否空闲。这导致了该答案中的第二条评论,“突然终止进程可能会导致应用程序中出现奇怪的行为。” 这是正确的,具体取决于超时的作业正在执行的操作。因此,我们同意,如果这样做会导致奇怪的行为,我们不应该让工作超时并提前终止它们。但我不知道如何Pebble更好地处理这个问题。

该答案所回答的问题实际上有一种技术可以做你想做的事情。您需要放弃使用该map函数,转而使用apply_async指定回调函数,以便在结果可用时将其保存。在下面的示例中,我仅出于演示目的而使用 5 秒的 TIMEOUT 值,并已为我提交的 10 个作业中的大约一半安排了超时。我已经预先分配了一个名为的结果列表,squares该列表将保存 10 个结果,并且已使用 10 个None值进行初始化。如果第 i值是None我们全部完成时的值,那是因为正在处理值的作业i超时。我的工作函数还返回其参数v及其计算值v ** 2,以便回调函数知道squares计算结果应位于列表中的哪个槽:

import multiprocessing as mp
import time

def my_task(v):
    time.sleep(v)
    return v, v ** 2

squares = [None] * 10

def my_callback(t):
    i, s = t
    squares[i] = s


TIMEOUT = 5

if __name__ == '__main__':
    vs = range(10)
    pool = mp.Pool()
    results = [pool.apply_async(my_task, args=(v,), callback=my_callback) for v in vs]
    time.sleep(TIMEOUT)
    pool.terminate() # all processes, busy or idle, will be terminated
    print(squares)
Run Code Online (Sandbox Code Playgroud)

印刷:

[0, 1, 4, 9, 16, None, None, None, None, None]
Run Code Online (Sandbox Code Playgroud)

第二种更复杂的方法不使用回调函数。相反,它对get调用返回的每个AsynchResult实例进行调用以pool.apply_async指定超时值。这里棘手的一点是,对于初始调用,您必须使用完整的超时值。但是当结果返回或者你遇到超时异常时,你已经等待了一段时间t。这意味着下次您获得超时结果时,您指定的超时值应减少t

import multiprocessing as mp
import time

def my_task(v):
    time.sleep(6 if v == 0 else v)
    return v ** 2


TIMEOUT = 5

if __name__ == '__main__':
    vs = range(mp.cpu_count() - 1) # 7 on my desktop
    pool = mp.Pool() # poolsize is 8
    results = [pool.apply_async(my_task, args=(v,)) for v in vs]
    time_to_wait = TIMEOUT # initial time to wait
    start_time = time.time()
    for i, result in enumerate(results):
        try:
            return_value = result.get(time_to_wait) # wait for up to time_to_wait seconds
        except mp.TimeoutError:
            print('Timeout for v = ', i)
        else:
            print(f'Return value for v = {i} is {return_value}')
        # how much time has exprired since we began waiting?
        t = time.time() - start_time
        time_to_wait = TIMEOUT - t
        if time_to_wait < 0:
            time_to_wait = 0
    pool.terminate() # all processes, busy or idle, will be terminated
Run Code Online (Sandbox Code Playgroud)

印刷:

Timeout for v =  0
Return value for v = 1 is 1
Return value for v = 2 is 4
Return value for v = 3 is 9
Return value for v = 4 is 16
Timeout for v =  5
Timeout for v =  6
Run Code Online (Sandbox Code Playgroud)

笔记

通过使用apply_async而不是,可以使用chunksize为 1map有效地提交作业(请参阅的参数,它确定如何将可迭代参数分解为“块”并放入每个进程的输入队列中,以最大程度地减少共享内存传输的数量对于大型迭代,与 相比可能效率较低,后者根据池的大小和要处理的作业数量使用“合理”的默认块大小。chunksizemapapply_asyncmap