多处理池是否为每个进程提供相同数量的任务,或者它们是否可用?

Joh*_*Mee 18 python pool multiprocessing

当你map可以迭代到a multiprocessing.Pool时,迭代被划分为一开始池中每个进程的队列,或者是否有一个公共队列,当进程空闲时从中获取任务?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()
Run Code Online (Sandbox Code Playgroud)

所以考虑到这个未经测试的建议代码; 如果池中有4个进程,每个进程都会分配25个要做的事情,或者100个进程被进程逐个挑选,寻找要做的事情,以便每个进程可以执行不同数量的东西,例如30 ,26,24,20.

aba*_*ert 23

所以考虑到这个未经测试的建议代码; 如果池中有4个进程,每个进程都会分配25个要做的事情,或者100个进程被进程逐个挑选,寻找要做的事情,以便每个进程可以执行不同数量的东西,例如30 ,26,24,20.

嗯,显而易见的答案是测试它.

原样,测试可能不会告诉你太多,因为工作将尽快完成,并且即使池化进程在准备就绪时抓住了工作,事情也可能最终均匀分布.但有一个简单的方法来解决这个问题:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)
Run Code Online (Sandbox Code Playgroud)

如果这些数字是"锯齿状",你就知道汇集的进程必须抓住新的工作就好了.(我明确地设置chunksize为1以确保块不是那么大,每个块首先只获得一个块.)

当我在8核机器上运行时:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})
Run Code Online (Sandbox Code Playgroud)

因此,看起来流程正在快速获得新的工作.

既然你特别问约4个工人,我换Pool()Pool(4)和得到这个:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})
Run Code Online (Sandbox Code Playgroud)

但是,有一种更好的方法可以找到而不是通过测试:阅读源代码.

正如您所看到的,map只是调用map_async,它创建了一堆批处理并将它们放在一个self._taskqueue对象(一个Queue.Queue实例)上.如果你进一步阅读,这个队列不会直接与其他进程共享,但是有一个池管理器线程,只要进程完成并返回结果,就会从队列中弹出下一个作业并将其提交回进程.

这也是你如何找出默认chunksize的用途map.上面链接的2.7实现显示它只是len(iterable) / (len(self._pool) * 4)四舍五入(比避免分数算术的更冗长) - 或者换句话说,只是大到足以容纳每个进程大约4个块.但你真的不应该依赖于此; 文档模糊地和间接地暗示它将使用某种启发式方法,但不会给你任何关于它将是什么的保证.因此,如果您确实需要"每个进程大约4个块",请明确计算它.更现实的是,如果您需要除默认值之外的任何内容,您可能需要一个特定于域的值(通过计算,猜测或分析).