使用比核心更多的工作进程

Bra*_*mon 5 python parallel-processing optimization multiprocessing python-multiprocessing

这个来自 PYMOTW 的例子给出了一个例子,multiprocessing.Pool()其中processes传递的参数(工作进程数)是机器内核数的两倍。

pool_size = multiprocessing.cpu_count() * 2
Run Code Online (Sandbox Code Playgroud)

(否则该类将默认为 just cpu_count()。)

这有什么道理吗?创建比核心数更多的工人有什么影响?是否有理由这样做,或者它可能会在错误的方向上施加额外的开销?我很好奇为什么它会一直包含在我认为是信誉良好的网站的示例中。

在最初的测试中,它实际上似乎会减慢速度:

$ python -m timeit -n 25 -r 3 'import double_cpus; double_cpus.main()'
25 loops, best of 3: 266 msec per loop
$ python -m timeit -n 25 -r 3 'import default_cpus; default_cpus.main()'
25 loops, best of 3: 226 msec per loop
Run Code Online (Sandbox Code Playgroud)

double_cpus.py

import multiprocessing

def do_calculation(n):
    for i in range(n):
        i ** 2

def main():
    with multiprocessing.Pool(
        processes=multiprocessing.cpu_count() * 2,
        maxtasksperchild=2,
    ) as pool:
        pool.map(do_calculation, range(1000))
Run Code Online (Sandbox Code Playgroud)

default_cpus.py

def main():
    # `processes` will default to cpu_count()
    with multiprocessing.Pool(
        maxtasksperchild=2,
    ) as pool:
        pool.map(do_calculation, range(1000))
Run Code Online (Sandbox Code Playgroud)

Dar*_*aut 5

Doing this can make sense if your job is not purely cpu-bound, but also involves some I/O.

The computation in your example is also too short for a reasonable benchmark, the overhead of just creating more processes in the first place dominates.

I modified your calculation to let it iterate over a range of 10M, while calculating an if-condition and let it take a nap in case it evaluates to True, which happens n_sleep-times. That way a total sleep of sleep_sec_total can be injected into the computation.

# default_cpus.py
import time
import multiprocessing


def do_calculation(iterations, n_sleep, sleep_sec):
    for i in range(iterations):
        if i % (iterations / n_sleep) == 0:
            time.sleep(sleep_sec)


def main(sleep_sec_total):

    iterations = int(10e6)
    n_sleep = 100
    sleep_sec = sleep_sec_total / n_sleep
    tasks = [(iterations, n_sleep, sleep_sec)] * 20

    with multiprocessing.Pool(
        maxtasksperchild=2,
    ) as pool:
        pool.starmap(do_calculation, tasks)
Run Code Online (Sandbox Code Playgroud)
# double_cpus.py
...

def main(sleep_sec_total):

    iterations = int(10e6)
    n_sleep = 100
    sleep_sec = sleep_sec_total / n_sleep
    tasks = [(iterations, n_sleep, sleep_sec)] * 20

    with multiprocessing.Pool(
        processes=multiprocessing.cpu_count() * 2,
        maxtasksperchild=2,
    ) as pool:
        pool.starmap(do_calculation, tasks)
Run Code Online (Sandbox Code Playgroud)

I ran the benchmark with sleep_sec_total=0 (purely cpu-bound) and with sleep_sec_total=2 for both modules.

Results with sleep_sec_total=0:

$ python -m timeit -n 5 -r 3 'import default_cpus; default_cpus.main(0)'
5 loops, best of 3: 15.2 sec per loop

$ python -m timeit -n 5 -r 3 'import double_cpus; double_cpus.main(0)'
5 loops, best of 3: 15.2 sec per loop
Run Code Online (Sandbox Code Playgroud)

Given a reasonable computation-size, you'll observe close to no difference between default- and double-cpus for a purely cpu-bound task. Here it happened, that both tests had the same best-time.

Results with sleep_sec_total=2:

$ python -m timeit -n 5 -r 3 'import default_cpus; default_cpus.main(2)'
5 loops, best of 3: 20.5 sec per loop
$ python -m timeit -n 5 -r 3 'import double_cpus; double_cpus.main(2)'
5 loops, best of 3: 17.7 sec per loop
Run Code Online (Sandbox Code Playgroud)

Now with adding 2 seconds of sleep as a dummy for I/0, the picture looks different. Using double as much processes gave a speed up of about 3 seconds compared to the default.

  • 注意到它专门用于单个任务 * 混合 * CPU 绑定和 I/O 绑定步骤的情况。如果任务主要是 I/O 绑定,您通常只使用 `multiprocessing.dummy.Pool` 代替,因此池由线程而不是进程支持,从而消除了进程间通信开销。在这种情况下,您可能还需要超过 `#cores * 2` 个工作线程;`concurrent.futures.ThreadPoolExecutor`(不受 `multiprocessing.Pool` API 文档约束)默认为 `#cores * 5`(对于旋转磁盘绑定任务来说可能太高了,但可以找到其他任何东西)。 (6认同)