Multiprocessing.Pool使Numpy矩阵乘法更慢

Fra*_*ter 15 python numpy pool multiprocessing

所以,我玩弄multiprocessing.PoolNumpy,但似乎我错过了一些重要的一点.为什么pool版本慢得多?我查看了htop,我可以看到创建了几个进程,但它们共享一个CPU,最多可达100%.

$ cat test_multi.py 
import numpy as np
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':
    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    pool = Pool(8)
    print timeit(lambda: map(mmul, matrices), number=20)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

$ python test_multi.py 
16.0265390873
19.097837925
Run Code Online (Sandbox Code Playgroud)

[更新]

  • 改为timeit基准测试流程
  • init池,包含许多核心
  • 改变计算,以便有更多的计算和更少的内存传输(我希望)

仍然没有变化.pool版本仍然较慢,我可以看到htop,只使用了一个核心,也产生了几个进程.

[UPDATE2]

目前我正在阅读@ Jan-Philip Gehrcke的使用multiprocessing.Process()和建议Queue.但与此同时我想知道:

  1. 为什么我的例子适用于蒂亚戈?可能是因为它不能在我的机器上工作1
  2. 我的示例代码是否在进程之间进行了任何复制?我打算让我的代码给每个线程一个矩阵列表.
  3. 我的代码是一个坏的例子,因为我使用Numpy

我了解到,当其他人知道我的最终目标时,通常会得到更好的答案:我有很多文件,这些文件是以连续方式加载和处理的.处理是CPU密集型的,所以我假设可以通过并行化获得很多.我的目的是调用并行分析文件的python函数.此外,这个函数只是C代码的接口,我认为,这有所不同.

1 Ubuntu 12.04,Python 2.7.3,i7 860 @ 2.80 - 如果您需要更多信息,请发表评论.

[UPDATE3]

以下是Stefano示例代码的结果.出于某种原因,没有加速.:/

testing with 16 matrices
base  4.27
   1  5.07
   2  4.76
   4  4.71
   8  4.78
  16  4.79
testing with 32 matrices
base  8.82
   1 10.39
   2 10.58
   4 10.73
   8  9.46
  16  9.54
testing with 64 matrices
base 17.38
   1 19.34
   2 19.62
   4 19.59
   8 19.39
  16 19.34
Run Code Online (Sandbox Code Playgroud)

[更新4]回答Jan-Philip Gehrcke的评论

对不起,我没有让自己更清楚.正如我在Update 2中所写的,我的主要目标是并行化第三方Python库函数的许多串行调用.此函数是某些C代码的接口.我被推荐使用Pool,但是这没有用,所以我尝试了一些更简单的东西,如上面的例子所示numpy.但是在那里我无法实现性能提升,即使它在寻找我'贪图并行化'.所以我认为我一定错过了重要的事情.这个信息是我正在寻找的这个问题和赏金.

[更新5]

感谢您的巨大投入.但阅读你的答案只会给我带来更多问题.因此,当我更清楚地了解我不知道的内容时,我会阅读基础知识并创建新的SO问题.

ali*_*i_m 18

关于所有进程都在同一个CPU上运行的事实,请参阅我的答案.

在导入期间,numpy更改父进程的CPU关联性,这样当您稍后使用Pool它生成的所有工作进程时,最终将争夺同一核心,而不是使用计算机上可用的所有核心.

您可以taskset在导入后调用numpy以重置CPU亲和性,以便使用所有核心:

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':

    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    print timeit(lambda: map(mmul, matrices), number=20)

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all cores
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)
    print timeit(lambda: pool.map(mmul, matrices), number=20)
Run Code Online (Sandbox Code Playgroud)

输出:

    $ python tmp.py                                     
    12.4765810966
    pid 29150's current affinity mask: 1
    pid 29150's new affinity mask: ff
    13.4136221409
Run Code Online (Sandbox Code Playgroud)

如果top在运行此脚本时使用CPU使用率,则应在执行"并行"部分时使用所有核心来查看它.正如其他人所指出的那样,在您的原始示例中,酸洗数据,流程创建等所涉及的开销可能超过并行化带来的任何可能的好处.

编辑:我怀疑单个进程似乎始终更快的部分原因是,numpy可能有一些技巧可以加速在多个核心上分散作业时无法使用的元素矩阵乘法.

例如,如果我只使用普通的Python列表来计算Fibonacci序列,那么我可以从并行化中获得巨大的加速.同样,如果我以不利于矢量化的方式进行逐元素乘法,我会得到类似的并行版本的加速:

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool

def fib(dummy):
    n = [1,1]
    for ii in xrange(100000):
        n.append(n[-1]+n[-2])

def silly_mult(matrix):
    for row in matrix:
        for val in row:
            val * val

if __name__ == '__main__':

    dt = timeit(lambda: map(fib, xrange(10)), number=10)
    print "Fibonacci, non-parallel: %.3f" %dt

    matrices = [np.random.randn(1000,1000) for ii in xrange(10)]
    dt = timeit(lambda: map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, non-parallel: %.3f" %dt

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all CPUS
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)

    dt = timeit(lambda: pool.map(fib,xrange(10)), number=10)
    print "Fibonacci, parallel: %.3f" %dt

    dt = timeit(lambda: pool.map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, parallel: %.3f" %dt
Run Code Online (Sandbox Code Playgroud)

输出:

$ python tmp.py
Fibonacci, non-parallel: 32.449
Silly matrix multiplication, non-parallel: 40.084
pid 29528's current affinity mask: 1
pid 29528's new affinity mask: ff
Fibonacci, parallel: 9.462
Silly matrix multiplication, parallel: 12.163
Run Code Online (Sandbox Code Playgroud)


Jan*_*cke 13

通信开销和计算加速之间的不可预测的竞争肯定是这里的问题.你所观察到的完全没问题.您是否获得净加速取决于许多因素,并且必须正确量化(如您所做).

那么为什么multiprocessing你的情况会如此"出乎意料地慢"呢? multiprocessingmapmap_async功能实际上咸菜Python对象来回通过与子进程连接父管道.这可能需要相当长的时间.在那段时间里,子进程几乎无所事事,这就是要看的内容htop.在不同的系统之间,可能存在相当大的管道传输性能差异,这也是为什么对于某些人来说你的池代码比单个CPU代码更快的原因,尽管对你来说它不是(其他因素可能在这里起作用,这只是一个例子,以解释效果).

你能做些什么来加快速度?

  1. 不要在POSIX兼容系统上挑选输入.

    如果您使用的是Unix,则可以利用POSIX的进程fork行为(写入时复制内存)来绕过parent-> child通信开销:

    全局可访问的变量创建父进程中工作输入(例如,大型矩阵列表).然后通过multiprocessing.Process()自己调用来创建工作进程.在子项中,从全局变量中获取作业输入.简单地说,这使得孩子访问父母的内存而没有任何通信开销(*,下面的解释).通过例如a将结果发送回父级multiprocessing.Queue.这将节省大量的通信开销,特别是如果输出与输入相比较小.此方法不适用于Windows,因为multiprocessing.Process()它创建了一个全新的Python进程,不会继承父进程的状态.

  2. 利用numpy多线程. 根据您的实际计算任务,可能会发生涉及multiprocessing根本没有帮助的事情.如果您自己编译numpy并启用OpenMP指令,那么对大型矩阵的操作可能会非常有效地多线程(并且分布在许多CPU内核上; GIL不是限制因素).基本上,这是在numpy/scipy的上下文中可以获得的多个CPU核心的最有效使用.

*孩子一般不能直接访问父母的记忆.但是,之后fork(),父母和孩子处于同等状态.将父级的整个内存复制到RAM中的另一个位置是愚蠢的.这就是写入时复制原理跳入的原因.只要子进程没有改变其内存状态,它就会实际访问父进程的内存.只有在修改时,相应的位和片才被复制到子节点的存储空间中.

主要编辑:

让我添加一段代码,用多个工作进程处理大量输入数据,并遵循建议"1.不要在兼容POSIX的系统上挑选输入." 此外,传回工人经理(父流程)的信息量非常低.该示例的重计算部分是单值分解.它可以大量使用OpenMP.我已多次执行该示例:

  • 一次使用1,2或4个工作进程OMP_NUM_THREADS=1,因此每个工作进程创建100%的最大负载.在那里,所提到的工人数量 - 计算时间缩放行为几乎是线性的,并且净加速因子上升对应于所涉及的工人数量.
  • 一次使用1,2或4个工作进程OMP_NUM_THREADS=4,以便每个进程创建400%的最大负载(通过生成4个OpenMP线程).我的机器有16个真正的核心,因此4个进程最多400%的负载,几乎可以从机器中获得最大的性能.缩放不再是完全线性的,并且加速因子不是所涉及的工作者的数量,但绝对计算时间OMP_NUM_THREADS=1与工作进程的数量相比显着减少并且时间仍然显着减少.
  • 一旦有更大的输入数据,4个核心,和OMP_NUM_THREADS=4.它导致平均系统负载为1253%.
  • 一次使用与上次相同的设置,但是OMP_NUM_THREADS=5.它导致平均系统负载为1598%,这表明我们从16核心机器获得了所有东西.然而,与后一种情况相比,实际计算壁时间没有改善.

代码:

import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing


# If numpy is compiled for OpenMP, then make sure to control
# the number of OpenMP threads via the OMP_NUM_THREADS environment
# variable before running this benchmark.


MATRIX_SIZE = 1000
MATRIX_COUNT = 16


def rnd_matrix():
    offset = np.random.randint(1,10)
    stretch = 2*np.random.rand()+0.1
    return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)


print "Creating input matrices in parent process."
# Create input in memory. Children access this input.
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]


def worker_function(result_queue, worker_index, chunk_boundary):
    """Work on a certain chunk of the globally defined `INPUT` list.
    """
    result_chunk = []
    for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
        # Perform single value decomposition (CPU intense).
        u, s, v = svd(m)
        # Build single numeric value as output.
        output =  int(np.sum(s))
        result_chunk.append(output)
    result_queue.put((worker_index, result_chunk))


def work(n_workers=1):
    def calc_chunksize(l, n):
        """Rudimentary function to calculate the size of chunks for equal 
        distribution of a list `l` among `n` workers.
        """
        return int(math.ceil(len(l)/float(n)))

    # Build boundaries (indices for slicing) for chunks of `INPUT` list.
    chunk_size = calc_chunksize(INPUT, n_workers)
    chunk_boundaries = [
        (i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]

    # When n_workers and input list size are of same order of magnitude,
    # the above method might have created less chunks than workers available. 
    if n_workers != len(chunk_boundaries):
        return None

    result_queue = multiprocessing.Queue()
    # Prepare child processes.
    children = []
    for worker_index in xrange(n_workers):
        children.append(
            multiprocessing.Process(
                target=worker_function,
                args=(
                    result_queue,
                    worker_index,
                    chunk_boundaries[worker_index],
                    )
                )
            )

    # Run child processes.
    for c in children:
        c.start()

    # Create result list of length of `INPUT`. Assign results upon arrival.
    results = [None] * len(INPUT)

    # Wait for all results to arrive.
    for _ in xrange(n_workers):
        worker_index, result_chunk = result_queue.get(block=True)
        chunk_boundary = chunk_boundaries[worker_index]
        # Store the chunk of results just received to the overall result list.
        results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk

    # Join child processes (clean up zombies).
    for c in children:
        c.join()
    return results


def main():
    durations = []
    n_children = [1, 2, 4]
    for n in n_children:
        print "Crunching input with %s child(ren)." % n
        t0 = time.time()
        result = work(n)
        if result is None:
            continue
        duration = time.time() - t0
        print "Result computed by %s child process(es): %s" % (n, result)
        print "Duration: %.2f s" % duration
        durations.append(duration)
    normalized_durations = [durations[0]/d for d in durations]
    for n, normdur in zip(n_children, normalized_durations):
        print "%s-children speedup: %.2f" % (n, normdur)


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

输出:

$ export OMP_NUM_THREADS=1
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 16.66 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 8.27 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 4.37 s
1-children speedup: 1.00
2-children speedup: 2.02
4-children speedup: 3.81
48.75user 1.75system 0:30.00elapsed 168%CPU (0avgtext+0avgdata 1007936maxresident)k
0inputs+8outputs (1major+809308minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=4
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 8.62 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 4.92 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 2.95 s
1-children speedup: 1.00
2-children speedup: 1.75
4-children speedup: 2.92
106.72user 3.07system 0:17.19elapsed 638%CPU (0avgtext+0avgdata 1022240maxresident)k
0inputs+8outputs (1major+841915minor)pagefaults 0swaps

$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [21762, 26806, 10148, 22947, 20900, 8161, 20168, 17439, 23497, 26360, 6789, 11216, 12769, 23022, 26221, 20480, 19140, 13757, 23692, 19541, 24644, 21251, 21000, 21687, 32187, 5639, 23314, 14678, 18289, 12493, 29766, 14987, 12580, 17988, 20853, 4572, 16538, 13284, 18612, 28617, 19017, 23145, 11183, 21018, 10922, 11709, 27895, 8981]
Duration: 12.69 s
4-children speedup: 1.00
174.03user 4.40system 0:14.23elapsed 1253%CPU (0avgtext+0avgdata 2887456maxresident)k
0inputs+8outputs (1major+1211632minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=5
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [19528, 17575, 21792, 24303, 6352, 22422, 25338, 18183, 15895, 19644, 20161, 22556, 24657, 30571, 13940, 18891, 10866, 21363, 20585, 15289, 6732, 10851, 11492, 29146, 12611, 15022, 18967, 25171, 10759, 27283, 30413, 14519, 25456, 18934, 28445, 12768, 28152, 24055, 9285, 26834, 27731, 33398, 10172, 22364, 12117, 14967, 18498, 8111]
Duration: 13.08 s
4-children speedup: 1.00
230.16user 5.98system 0:14.77elapsed 1598%CPU (0avgtext+0avgdata 2898640maxresident)k
0inputs+8outputs (1major+1219611minor)pagefaults 0swaps
Run Code Online (Sandbox Code Playgroud)


jef*_*upp 2

默认情况下,Pool仅使用 n 个进程,其中 n 是计算机上的 CPU 数量。您需要指定希望它使用多少个进程,例如Pool(5)

请参阅此处了解更多信息