Python 中的多重处理:并行化 for 循环以填充 Numpy 数组

Ant*_*ano 2 python parallel-processing numpy python-3.x python-multiprocessing

我一直在阅读类似这样的帖子,但其中任何一个似乎都适合我的情况。我正在尝试使用 Python 中的多重处理并行化以下玩具示例,以在 for 循环内填充 Numpy 数组:

import numpy as np
from multiprocessing import Pool
import time


def func1(x, y=1):
    return x**2 + y

def func2(n, parallel=False):
    my_array = np.zeros((n))
    
    # Parallelized version:
    if parallel:
        pool = Pool(processes=6)
        for idx, val in enumerate(range(1, n+1)):
            result = pool.apply_async(func1, [val])
            my_array[idx] = result.get()
        pool.close()

    # Not parallelized version:
    else:
        for i in range(1, n+1):
            my_array[i-1] = func1(i)

    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time()
    
    print(my_array)
    print("Normal time: {}\n".format(end-start))

    start_parallel = time.time()
    my_array_parallelized = func2(60000, parallel=True)
    end_parallel = time.time()

    print(my_array_parallelized)
    print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

基于多处理的代码中的行似乎有效并为您提供了正确的结果。然而,它比非并行化版本花费的时间要长得多。这是两个版本的输出

[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
 3.60000e+09]
Normal time: 0.01605963706970215

[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
 3.60000e+09]
Time based on multiprocessing: 2.8775112628936768
Run Code Online (Sandbox Code Playgroud)

我的直觉告诉我,这应该是从 pool.apply_async() 捕获结果的更好方法。我究竟做错了什么?实现这一目标最有效的方法是什么?谢谢。

Jér*_*ard 5

创建进程的成本很高。在我的机器上,创建每个进程至少需要几百微秒。此外,多处理模块在进程之间复制要计算的数据,然后从进程池中收集结果。这种进程间通信也非常慢。问题是你的计算是微不足道的,并且可以非常快地完成,可能比所有引入的开销快得多。多重处理模块仅在处理相当小的数据集并执行密集计算(与计算数据量相比)时才有用。

希望当涉及使用Numpy进行数值计算时,有一种简单而快速的方法来并行化您的应用程序:Numba JIT。如果您显式使用并行结构(和) ,Numba 可以并行化代码。它使用线程而不是在共享内存中工作的繁重进程。如果您的代码不处理本机类型和 Numpy 数组而不是纯 Python 动态对象(列表、大整数、类等),Numba 可以克服GIL 。这是一个例子:parallel=Trueprange

import numpy as np
import numba as nb
import time

@nb.njit
def func1(x, y=1):
    return x**2 + y

@nb.njit('float64[:](int64)', parallel=True)
def func2(n):
    my_array = np.zeros(n)
    for i in nb.prange(1, n+1):
        my_array[i-1] = func1(i)
    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time()
    
    print(my_array)
    print("Numba time: {}\n".format(end-start))

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

由于 Numba 在运行时编译代码,因此它能够将循环完全优化为无操作,在这种情况下导致时间接近 0 秒。