如何在python numpy中并行化和计算?

Kev*_*vin 3 python parallel-processing numpy sum scipy

我有一个总和,我正在尝试计算,我很难并行化代码.我试图并行化的计算有点复杂(它使用numpy数组和scipy稀疏矩阵).它吐出一个numpy数组,我想从大约1000个计算中求和输出数组.理想情况下,我会在所有迭代中保持运行总和.但是,我还没弄清楚如何做到这一点.

到目前为止,我已经尝试使用joblib的Parallel函数和pool.map函数与python的多处理包.对于这两个,我使用一个返回numpy数组的内部函数.这些函数返回一个列表,我将其转换为numpy数组,然后求和.

但是,在joblib并行函数完成所有迭代后,主程序永远不会继续运行(看起来原始作业处于挂起状态,使用0%CPU).当我使用pool.map时,在所有迭代完成后我得到内存错误.

有没有办法简单地并行化运行的数组总和?

编辑:目标是做以下的事情,除了并行.

def summers(num_iters):

    sumArr = np.zeros((1,512*512)) #initialize sum
    for index in range(num_iters):
        sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array

    return sumArr
Run Code Online (Sandbox Code Playgroud)

Kev*_*vin 5

我想出了如何使用多处理,apply_async和回调并行化数组的总和,所以我在这里为其他人发布这个.我使用Parallel Python的示例页面作为 Sum回调类,虽然我实际上没有使用该包来实现.但它给了我使用回调的想法.这是我最终使用的简化代码,它完成了我想要它做的事情.

import multiprocessing
import numpy as np
import thread

class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)
    def __init__(self):
        self.value = np.zeros((1,512*512)) #this is the initialization of the sum
        self.lock = thread.allocate_lock()
        self.count = 0

    def add(self,value):
        self.count += 1
        self.lock.acquire() #lock so sum is correct if two processes return at same time
        self.value += value #the actual summation
        self.lock.release()

def computation(index):
    array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
    return array1

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    sumArr = Sum() #create an instance of callback class and zero the sum
    for index in range(num_iters):
        singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)

    pool.close()
    pool.join() #waits for all the processes to finish

    return sumArr.value
Run Code Online (Sandbox Code Playgroud)

我还能够使用并行映射来实现这一点,这在另一个答案中提出.我之前尝试过这个,但我没有正确实现它.两种方式都有效,我认为这个答案很好地解释了使用哪种方法(map或apply.async)的问题.对于地图版本,您不需要定义类Sum和summers函数

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these
    sumArr = np.zeros((1,512*512))              #but I do to make sure I have the memory

    outputArr = np.array(pool.map(computation, range(num_iters)))
    sumArr = outputArr.sum(0)

    pool.close() #not sure if this is still needed since map waits for all iterations

    return sumArr
Run Code Online (Sandbox Code Playgroud)