最后一个进程的python多处理映射错误处理

Lui*_*uis 5 python dictionary pool multiprocessing

有一个奇怪的行为,map使用Python的时候multiprocessing.Pool。在下面的示例中,一个由 4 个处理器组成的池将处理 28 个任务。这应该需要七遍,每遍需要 4 秒。

然而,它需要8个pass。在前六轮中,所有处理器都被占用。在第 7 次传递中,仅完成了两个任务(两个空闲处理器)。剩下的 2 个任务在第 8 次传递中完成(再次是两个空闲处理器)。这种行为出现在看似随机的 cpu 数量和任务数量组合中,不必要地浪费时间。

此示例已在 Intel Xeon Haswell(20 核)和 Intel i7(4 核)上重现。

关于如何强制Pool在所有通道中使用所有可用处理器的任何想法?

import time
import multiprocessing
from multiprocessing import Pool
import datetime

def f(values):
    now = str(datetime.datetime.now())
    proc_id = str(multiprocessing.current_process())
    print(proc_id+' '+now)
    a=values**2
    time.sleep(4)
    return a 

if __name__ == '__main__':
    p = Pool(4) #number of processes
    processed_values= p.map( f, range(28))
    p.close()
    p.join()
    print processed_values
Run Code Online (Sandbox Code Playgroud)

运行的输出如下

<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:49.604065
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:49.604189
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:49.604252
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:49.604866
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:53.608475
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:53.608878
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:53.608931
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:53.609503
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:57.612831
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:57.613135
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:57.613555
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:57.614065
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:01.616974
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:01.617273
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:01.617699
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:01.618190
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:05.621284
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:05.621489
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:05.622130
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:05.622404
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:09.625522
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:09.625631
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:09.626555
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:09.626566
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:13.629761
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:13.629846
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:17.634003
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:17.634317
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729]
Run Code Online (Sandbox Code Playgroud)

这与以下问题有关,该问题没有明确或正确的答案。 Python:多处理映射需要更长的时间才能完成最后几个进程

dan*_*ano 5

这是由于Pool.map将传递给它的可迭代对象分块并将其发送给Pool. 如果您将 the 强制chunksize为 1,您将看到您期望的行为:

import time
import multiprocessing
from multiprocessing import Pool
import datetime

def f(values):
    now = str(datetime.datetime.now())
    proc_id = str(multiprocessing.current_process())
    print(proc_id+' '+now)
    a=values**2
    time.sleep(4)
    return a 

if __name__ == '__main__':
    p = Pool(4) #number of processes
    processed_values= p.map( f, range(28), chunksize=1)
    p.close()
    p.join()
    print processed_values
Run Code Online (Sandbox Code Playgroud)

输出:

<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:06.548733
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:06.548803
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:06.549013
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:06.549052
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:10.549509
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:10.551091
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:10.553057
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:10.553263
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:14.553765
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:14.553821
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:14.554953
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:14.557262
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:18.556535
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:18.556611
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:18.558019
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:18.561597
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:22.560039
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:22.560097
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:22.562236
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:22.565912
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:26.564383
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:26.564430
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:26.564589
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:26.570232
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:30.568634
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:30.568647
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:30.568752
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:30.574456
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729]
Run Code Online (Sandbox Code Playgroud)

map当您不提供块大小时,用于选择块大小的算法如下所示:

    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0
Run Code Online (Sandbox Code Playgroud)

对于大小为 28 的迭代,结果为 2。这意味着每个工作进程一次从您的迭代中抓取两个项目,而不是一个。因此,当队列中只剩下四个项目时,第一个空闲的工作人员得到两个,第二个空闲的工作人员得到两个,其他两个工作人员就没有更多了。

分块的首要原因是通过减少 IPC 开销,在处理非常大的可迭代对象时大大提高了性能。对于较小的可迭代对象,它往往不会产生太大影响,甚至不会影响性能,就像在这种情况下一样。