2 python multithreading multiprocessing python-multithreading python-multiprocessing
我尝试利用python的池多处理功能。
与我如何设置块大小无关(在Windows 7和Ubuntu下-后者在下面具有4个内核),并行线程的数量似乎保持不变。
from multiprocessing import Pool
from multiprocessing import cpu_count
import multiprocessing
import time
def f(x):
print("ready to sleep", x, multiprocessing.current_process())
time.sleep(20)
print("slept with:", x, multiprocessing.current_process())
if __name__ == '__main__':
processes = cpu_count()
print('-' * 20)
print('Utilizing %d cores' % processes)
print('-' * 20)
pool = Pool(processes)
myList = []
runner = 0
while runner < 40:
myList.append(runner)
runner += 1
print("len(myList):", len(myList))
# chunksize = int(len(myList) / processes)
# chunksize = processes
chunksize = 1
print("chunksize:", chunksize)
pool.map(f, myList, 1)
Run Code Online (Sandbox Code Playgroud)
该行为是相同的I是否使用chunksize = int(len(myList) / processes),chunksize = processes或者1(如在上面的例子)。
可以将chunksize自动设置为核心数量吗?
示例chunksize = 1:
--------------------
Utilizing 4 cores
--------------------
len(myList): 40
chunksize: 10
ready to sleep 0 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 1 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 2 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 3 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 0 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 4 <ForkProcess(ForkPoolWorker-1, started daemon)>
slept with: 1 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 5 <ForkProcess(ForkPoolWorker-2, started daemon)>
slept with: 2 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 6 <ForkProcess(ForkPoolWorker-3, started daemon)>
slept with: 3 <ForkProcess(ForkPoolWorker-4, started daemon)>
ready to sleep 7 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 4 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 8 <ForkProcess(ForkPoolWorker-1, started daemon)>
slept with: 5 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 9 <ForkProcess(ForkPoolWorker-2, started daemon)>
slept with: 6 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 10 <ForkProcess(ForkPoolWorker-3, started daemon)>
slept with: 7 <ForkProcess(ForkPoolWorker-4, started daemon)>
ready to sleep 11 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 8 <ForkProcess(ForkPoolWorker-1, started daemon)>
Run Code Online (Sandbox Code Playgroud)
Chunksize不会影响要使用多少个内核,这由的processes参数设置Pool。Chunksize设置您传递给的可迭代项的数量,这些项在称为“任务”的Pool.map每个工作进程中一次分配Pool(以下图显示了Python 3.7.1)。
如果您设置chunksize=1,则只有在完成之前收到的工作后,工作流程才能在新任务中获得新项目。对于chunksize > 1一个任务来说,一个工人一次获取整批物品,当它完成时,如果还有剩余,它将获得下一批。
分配项一个接一个与chunksize=1调度的增加的灵活性,同时降低了它的总体吞吐量,因为滴供给需要更多的进程间通信(IPC)。
在我的游泳池的CHUNKSIZE算法的深入分析在这里,我定义了工作单位处理一个可迭代的项目taskel,以避免命名冲突字的“任务”的游泳池的使用。任务(作为工作单元)由任务组组成chunksize。
你会设定chunksize=1,如果你不能预测taskel多长时间需要完成,例如优化问题,在处理时间跨越taskels良莠不齐。此处滴灌可防止工人流程坐在一堆未接触的物品上,而在一个沉重的任务板上el缩时,可防止其任务中的其他物品分配给闲置的工人流程。
否则,如果所有任务组都需要相同的时间才能完成,则可以设置chunksize=len(iterable) // processes,以使任务仅在所有工作进程中分配一次。请注意,如果len(iterable) / processes有剩余,这将产生比进程(进程+ 1)多的任务。这有可能严重影响您的总体计算时间。在先前链接的答案中了解有关此内容的更多信息。
仅供参考,这是源代码的一部分,Pool如果未设置,则在内部计算块大小:
# Python 3.6, line 378 in `multiprocessing.pool.py`
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
Run Code Online (Sandbox Code Playgroud)