如何通过多重处理将函数应用于2D numpy数组

Tra*_*sIV 3 python arrays numpy multiprocessing

假设我具有以下功能:

def f(x,y):
    return x*y
Run Code Online (Sandbox Code Playgroud)

如何使用多重处理模块将功能应用于NxM 2D numpy数组中的每个元素?使用串行迭代,代码可能如下所示:

import numpy as np
N = 10
M = 12
results = np.zeros(shape=(N,M))
for x in range(N):
    for y in range(M):
        results[x,y] = f(x,y)
Run Code Online (Sandbox Code Playgroud)

ali*_*i_m 5

这是您可以使用并行化示例函数的方法multiprocesssing。我还包括一个几乎完全相同的使用非并行for循环的纯Python函数,以及一个达到相同结果的numpy单行代码:

import numpy as np
from multiprocessing import Pool


def f(x,y):
    return x * y

# this helper function is needed because map() can only be used for functions
# that take a single argument (see http://stackoverflow.com/q/5442910/1461210)
def splat_f(args):
    return f(*args)

# a pool of 8 worker processes
pool = Pool(8)

def parallel(M, N):
    results = pool.map(splat_f, ((i, j) for i in range(M) for j in range(N)))
    return np.array(results).reshape(M, N)

def nonparallel(M, N):
    out = np.zeros((M, N), np.int)
    for i in range(M):
        for j in range(N):
            out[i, j] = f(i, j)
    return out

def broadcast(M, N):
    return np.prod(np.ogrid[:M, :N])
Run Code Online (Sandbox Code Playgroud)

现在让我们看一下性能:

%timeit parallel(1000, 1000)
# 1 loops, best of 3: 1.67 s per loop

%timeit nonparallel(1000, 1000)
# 1 loops, best of 3: 395 ms per loop

%timeit broadcast(1000, 1000)
# 100 loops, best of 3: 2 ms per loop
Run Code Online (Sandbox Code Playgroud)

非并行的纯Python版本比并行的版本好4倍左右,而使用numpy数组广播的版本绝对压倒了其他两个版本。

问题在于,启动和停止Python子进程会带来很多开销,并且您的测试功能非常琐碎,以至于每个工作线程在整个生命周期中仅花费一小部分时间来完成有用的工作。仅当每个线程在被杀死之前有大量工作要做时,多处理才有意义。例如,您可能会给每个工作人员更大的输出数组块进行计算(尝试弄乱chunksize=参数to pool.map()),但是对于这样一个琐碎的示例,我怀疑您会看到很大的改进。

我不知道您的实际代码是什么样子-也许您的函数既庞大又昂贵,足以保证使用多处理。但是,我敢打赌,有很多更好的方法可以改善其性能。