Jia*_* Lu 5 python multiprocessing
pool map()在Python中使用时是否可以设置操作的时间限制multiprocessing。当达到时间限制时,所有子进程都会停止并返回它们已有的结果。
import multiprocessing as mp
def task(v):
do something
return result
if __name__ == '__main__':
vs = [...]
p= mp.Pool()
results = p.map(task, vs)
Run Code Online (Sandbox Code Playgroud)
在上面的例子中,我有一个非常大的列表vs。理想情况下,列表中的所有元素都vs将被发送到 function task(),并且所有结果都将保存在resultslist 中。
然而,由于列表vs非常大,而且我执行此过程的时间有限(比如 5 分钟)。我需要的是在达到 5 分钟时停止该map过程,并将计算结果返回到 list results。
编辑1:
我不会终止需要超过 5 分钟才能完成的任务。假设我的列表中有 1000 个任务vs,5 分钟后只完成了 600 个任务。我需要的是杀死所有子进程,并将这 600 个任务的结果保存到列表中results。
我查看了ForceBru 提到的答案,它使用了名为Pebble. 首先,我不明白有关“Python标准池不支持超时”的评论。它的作用方式是,您可以等待指定的时间来返回结果,并通过异常通知它是否已返回,或者您可以只对指定超时的结果对象发出等待。在这种情况下不会返回异常,但您可以测试处理结果的“作业”是否已完成。然而,您确实无法终止单个超时作业。但是,当您处理完所有未超时的结果后,您可以调用terminate池本身,这将终止池中的所有进程,无论它们是否空闲。这导致了该答案中的第二条评论,“突然终止进程可能会导致应用程序中出现奇怪的行为。” 这是正确的,具体取决于超时的作业正在执行的操作。因此,我们同意,如果这样做会导致奇怪的行为,我们不应该让工作超时并提前终止它们。但我不知道如何Pebble更好地处理这个问题。
该答案所回答的问题实际上有一种技术可以做你想做的事情。您需要放弃使用该map函数,转而使用apply_async指定回调函数,以便在结果可用时将其保存。在下面的示例中,我仅出于演示目的而使用 5 秒的 TIMEOUT 值,并已为我提交的 10 个作业中的大约一半安排了超时。我已经预先分配了一个名为的结果列表,squares该列表将保存 10 个结果,并且已使用 10 个None值进行初始化。如果第 i个值是None我们全部完成时的值,那是因为正在处理值的作业i超时。我的工作函数还返回其参数v及其计算值v ** 2,以便回调函数知道squares计算结果应位于列表中的哪个槽:
import multiprocessing as mp
import time
def my_task(v):
time.sleep(v)
return v, v ** 2
squares = [None] * 10
def my_callback(t):
i, s = t
squares[i] = s
TIMEOUT = 5
if __name__ == '__main__':
vs = range(10)
pool = mp.Pool()
results = [pool.apply_async(my_task, args=(v,), callback=my_callback) for v in vs]
time.sleep(TIMEOUT)
pool.terminate() # all processes, busy or idle, will be terminated
print(squares)
Run Code Online (Sandbox Code Playgroud)
印刷:
[0, 1, 4, 9, 16, None, None, None, None, None]
Run Code Online (Sandbox Code Playgroud)
第二种更复杂的方法不使用回调函数。相反,它对get调用返回的每个AsynchResult实例进行调用以pool.apply_async指定超时值。这里棘手的一点是,对于初始调用,您必须使用完整的超时值。但是当结果返回或者你遇到超时异常时,你已经等待了一段时间t。这意味着下次您获得超时结果时,您指定的超时值应减少t:
import multiprocessing as mp
import time
def my_task(v):
time.sleep(6 if v == 0 else v)
return v ** 2
TIMEOUT = 5
if __name__ == '__main__':
vs = range(mp.cpu_count() - 1) # 7 on my desktop
pool = mp.Pool() # poolsize is 8
results = [pool.apply_async(my_task, args=(v,)) for v in vs]
time_to_wait = TIMEOUT # initial time to wait
start_time = time.time()
for i, result in enumerate(results):
try:
return_value = result.get(time_to_wait) # wait for up to time_to_wait seconds
except mp.TimeoutError:
print('Timeout for v = ', i)
else:
print(f'Return value for v = {i} is {return_value}')
# how much time has exprired since we began waiting?
t = time.time() - start_time
time_to_wait = TIMEOUT - t
if time_to_wait < 0:
time_to_wait = 0
pool.terminate() # all processes, busy or idle, will be terminated
Run Code Online (Sandbox Code Playgroud)
印刷:
Timeout for v = 0
Return value for v = 1 is 1
Return value for v = 2 is 4
Return value for v = 3 is 9
Return value for v = 4 is 16
Timeout for v = 5
Timeout for v = 6
Run Code Online (Sandbox Code Playgroud)
笔记
通过使用apply_async而不是,可以使用chunksize为 1map有效地提交作业(请参阅的参数,它确定如何将可迭代参数分解为“块”并放入每个进程的输入队列中,以最大程度地减少共享内存传输的数量对于大型迭代,与 相比可能效率较低,后者根据池的大小和要处理的作业数量使用“合理”的默认块大小。chunksizemapapply_asyncmap
| 归档时间: |
|
| 查看次数: |
4998 次 |
| 最近记录: |