Python:重写循环numpy数学函数以在GPU上运行

Rad*_*duS 20 python numpy theano numba tensorflow

有人可以帮我改写这个函数(doTheMath函数)来在GPU上进行计算吗?我现在用了好几天试图绕过它,但没有结果.我想也许有人可以帮助我以你认为适合作为日志的任何方式重写这个函数,因为我在最后给出了相同的结果.我试图使用@jitfrom numba但由于某种原因,它实际上比常规运行代码慢得多.由于样本量很大,我们的目标是大大缩短执行时间,因此我相信GPU是最快的方法.

我会解释一下实际发生的事情.实际数据看起来几乎与下面代码中创建的样本数据完全相同,每个样本分为大约5.000.000行的样本大小或每个文件大约150MB.总共有大约600.000.000行或20GB的数据.我必须循环遍历这些数据,逐个样本然后逐行逐行,从每行中取出最后2000行(或另一行)并运行doTheMath返回结果的函数.然后将该结果保存回硬盘驱动器,我可以使用另一个程序执行其他操作.如下所示,我不需要所有行的所有结果,只需要大于特定数量的行.如果我现在在python中运行我的函数,那么每1.000.000行大约需要62秒.考虑到所有数据以及应该用多快的速度,这是一段很长的时间.

我必须提一下,我借助于文件将真实数据文件上传到RAM,data = joblib.load(file)因此上传数据不是问题,因为每个文件只需要大约0.29秒.上传后,我运行下面的整个代码.花费最长时间的是doTheMath功能.我愿意将我在stackoverflow上获得的所有500个声望点作为奖励给那些愿意帮我重写这个简单代码以在GPU上运行的人.我的兴趣特别在于GPU,我真的很想看看它是如何解决这个问题的.

编辑/更新1: 这是一个指向真实数据的小样本的链接:data_csv.zip大约102000行真实数据1和2000行用于真实数据2a和data2b.用于minimumLimit = 400实际样本数据

编辑/更新2: 对于这篇文章后面的人,这里是以下答案的简短摘要.到目前为止,我们对原始解决方案有4个答案.@Divakar提供的那个只是对原始代码的调整.在这两个调整中,只有第一个实际上适用于这个问题,第二个是一个很好的调整但不适用于此.在其他三个答案中,其中两个是基于CPU的解决方案和一个tensorflow-GPU尝试.Paul Panzer的Tensorflow-GPU似乎很有前景,但是当我在GPU上实际运行它时它比原来慢,所以代码仍然需要改进.

另外两个基于CPU的解决方案由@PaulPanzer(一个纯粹的numpy解决方案)和@MSeifert(一个numba解决方案)提交.与原始代码相比,这两种解决方案都能提供非常好的结果和两种处理数据.在Paul Panzer提交的两个中,速度更快.它在大约3秒内处理大约1.000.000行.唯一的问题是较小的batchSizes,这可以通过切换到MSeifert提供的numba解决方案,或者甚至是在下面讨论的所有调整之后的原始代码来克服.

我非常高兴并感谢@PaulPanzer和@MSeifert所做的关于他们答案的工作.不过,由于这是一个关于基于GPU的解决方案的问题,我等着看是否有人愿意尝试GPU版本,看看与当前的CPU相比,GPU上的数据处理速度有多快解决方案.如果没有其他答案胜过@PaperPanzer的纯粹numpy解决方案那么我会接受他的答案作为正确的答案并得到赏金:)

编辑/更新3: @Divakar已经发布了一个新的答案与GPU的解决方案.在对真实数据进行测试之后,速度甚至与CPU对应解决方案无法相比.GPU在大约1.5秒内处理大约5.000.000.这太不可思议了:)我对GPU解决方案感到非常兴奋,感谢@Divakar发布它.我感谢@PaulPanzer和@MSeifert的CPU解决方案:)现在我的研究继续以令人难以置信的速度归功于GPU :)

import pandas as pd
import numpy as np
import time

def doTheMath(tmpData1, data2a, data2b):
    A = tmpData1[:, 0]
    B = tmpData1[:,1]
    C = tmpData1[:,2]
    D = tmpData1[:,3]
    Bmax = B.max()
    Cmin  = C.min()
    dif = (Bmax - Cmin)
    abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

#Declare variables
batchSize = 2000
sampleSize = 5000000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data 

#Create Random Sample Data
data1 = np.matrix(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #upper limit
data2b = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #lower limit
#approx. half of data2a will be smaller than data2b, but that is only in the sample data because it is randomly generated, NOT the real data. The real data2a is always higher than data2b.


#Loop through the data
t0 = time.time()
for rowNr in  range(data1.shape[0]):
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
    if(tmp_df.shape[0] == batchSize):
        result = doTheMath(tmp_df, data2a, data2b)
        if (result >= minimumLimit):
            resultArray.append([rowNr , result])
print('Runtime:', time.time() - t0)

#Save data results
resultArray = np.array(resultArray)
print(resultArray[:,1].sum())
resultArray = pd.DataFrame({'index':resultArray[:,0], 'result':resultArray[:,1]})
resultArray.to_csv("Result Array.csv", sep=';')
Run Code Online (Sandbox Code Playgroud)

我正在研究的PC规格:

GTX970(4gb) video card; 
i7-4790K CPU 4.00Ghz; 
16GB RAM;
a SSD drive 
running Windows 7; 
Run Code Online (Sandbox Code Playgroud)

作为一个附带问题,SLI中的第二张显卡会帮助解决这个问题吗?

Div*_*kar 10

调整#1

通常建议在使用NumPy数组时对事物进行矢量化.但是对于非常大的阵列,我认为你没有选择.因此,为了提高性能,可以在求和的最后一步进行微调.

我们可以更换的步骤,使数组1s0s和做总结:

np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
Run Code Online (Sandbox Code Playgroud)

使用np.count_nonzero它有效地计算True布尔数组中的值,而不是转换为1s0s-

np.count_nonzero((abcd <= data2a) & (abcd >= data2b))
Run Code Online (Sandbox Code Playgroud)

运行时测试 -

In [45]: abcd = np.random.randint(11,99,(10000))

In [46]: data2a = np.random.randint(11,99,(10000))

In [47]: data2b = np.random.randint(11,99,(10000))

In [48]: %timeit np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
10000 loops, best of 3: 81.8 µs per loop

In [49]: %timeit np.count_nonzero((abcd <= data2a) & (abcd >= data2b))
10000 loops, best of 3: 28.8 µs per loop
Run Code Online (Sandbox Code Playgroud)

调整#2

在处理经历隐式广播的案件时,使用预先计算的倒数.更多信息here.因此,存储它的倒数dif并使用它代替步骤:

((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ...
Run Code Online (Sandbox Code Playgroud)

样品测试 -

In [52]: A = np.random.rand(10000)

In [53]: dif = 0.5

In [54]: %timeit A/dif
10000 loops, best of 3: 25.8 µs per loop

In [55]: %timeit A*(1.0/dif)
100000 loops, best of 3: 7.94 µs per loop
Run Code Online (Sandbox Code Playgroud)

你有四个地方使用除法dif.所以,希望这也会带来显着的提升!


Div*_*kar 8

介绍和解决方案代码

好吧,你问了!因此,本文中列出的是一个实现,PyCUDA它使用轻量级包装器扩展了Python环境中CUDA的大部分功能.我们的SourceModule功能可以让我们编写和编译CUDA内核,并保留在Python环境中.

在所涉及的计算中,我们得到了最大和最小的滑动,很少的差异和划分以及比较.对于最大和最小部分,涉及块最大查找(对于每个滑动窗口),我们将使用缩减技术,如详细讨论的那样here.这将在块级别完成.对于跨滑动窗口的上层迭代,我们将使用网格级索引到CUDA资源.有关此块和网格格式的更多信息,请参阅page-18.PyCUDA还支持用于计算max和min等缩减的内置函数,但是我们失去了控制权,特别是我们打算使用专用内存,如共享和常量内存,以便在接近最佳水平的情况下利用GPU.

列出PyCUDA-NumPy解决方案代码 -

1] PyCUDA部分 -

import pycuda.autoinit
import pycuda.driver as drv
import numpy as np
from pycuda.compiler import SourceModule

mod = SourceModule("""
#define TBP 1024 // THREADS_PER_BLOCK

__device__ void get_Bmax_Cmin(float* out, float *d1, float *d2, int L, int offset)
{
    int tid = threadIdx.x;
    int inv = TBP;
    __shared__ float dS[TBP][2];

    dS[tid][0] = d1[tid+offset];  
    dS[tid][1] = d2[tid+offset];         
    __syncthreads();

    if(tid<L-TBP)  
    {
        dS[tid][0] = fmaxf(dS[tid][0] , d1[tid+inv+offset]);
        dS[tid][1] = fminf(dS[tid][1] , d2[tid+inv+offset]);
    }
    __syncthreads();
    inv = inv/2;

    while(inv!=0)   
    {
        if(tid<inv)
        {
            dS[tid][0] = fmaxf(dS[tid][0] , dS[tid+inv][0]);
            dS[tid][1] = fminf(dS[tid][1] , dS[tid+inv][1]);
        }
        __syncthreads();
        inv = inv/2;
    }
    __syncthreads();

    if(tid==0)
    {
        out[0] = dS[0][0];
        out[1] = dS[0][1];
    }   
    __syncthreads();
}

__global__ void main1(float* out, float *d0, float *d1, float *d2, float *d3, float *lowL, float *highL, int *BLOCKLEN)
{
    int L = BLOCKLEN[0];
    int tid = threadIdx.x;
    int iterID = blockIdx.x;
    float Bmax_Cmin[2];
    int inv;
    float Cmin, dif;   
    __shared__ float dS[TBP*2];   

    get_Bmax_Cmin(Bmax_Cmin, d1, d2, L, iterID);  
    Cmin = Bmax_Cmin[1];
    dif = (Bmax_Cmin[0] - Cmin);

    inv = TBP;

    dS[tid] = (d0[tid+iterID] + d1[tid+iterID] + d2[tid+iterID] + d3[tid+iterID] - 4.0*Cmin) / (4.0*dif);
    __syncthreads();

    if(tid<L-TBP)  
        dS[tid+inv] = (d0[tid+inv+iterID] + d1[tid+inv+iterID] + d2[tid+inv+iterID] + d3[tid+inv+iterID] - 4.0*Cmin) / (4.0*dif);                   

     dS[tid] = ((dS[tid] >= lowL[tid]) & (dS[tid] <= highL[tid])) ? 1 : 0;
     __syncthreads();

     if(tid<L-TBP)
         dS[tid] += ((dS[tid+inv] >= lowL[tid+inv]) & (dS[tid+inv] <= highL[tid+inv])) ? 1 : 0;
     __syncthreads();

    inv = inv/2;
    while(inv!=0)   
    {
        if(tid<inv)
            dS[tid] += dS[tid+inv];
        __syncthreads();
        inv = inv/2;
    }

    if(tid==0)
        out[iterID] = dS[0];
    __syncthreads();

}
""")
Run Code Online (Sandbox Code Playgroud)

请注意,这THREADS_PER_BLOCK, TBP是基于batchSize.这里的经验法则是将2值的幂分配给TBP小于的值batchSize.因此,batchSize = 2000我们需要TBPas 1024.

2] NumPy部分 -

def gpu_app_v1(A, B, C, D, batchSize, minimumLimit):
    func1 = mod.get_function("main1")
    outlen = len(A)-batchSize+1

    # Set block and grid sizes
    BSZ = (1024,1,1)
    GSZ = (outlen,1)

    dest = np.zeros(outlen).astype(np.float32)
    N = np.int32(batchSize)
    func1(drv.Out(dest), drv.In(A), drv.In(B), drv.In(C), drv.In(D), \
                     drv.In(data2b), drv.In(data2a),\
                     drv.In(N), block=BSZ, grid=GSZ)
    idx = np.flatnonzero(dest >= minimumLimit)
    return idx, dest[idx]
Run Code Online (Sandbox Code Playgroud)

标杆

我在GTX 960M上测试过.请注意,PyCUDA希望数组具有连续的顺序.因此,我们需要对列进行切片并进行复制.我期待/假设可以从文件中读取数据,使得数据沿着行传播而不是作为列传播.因此,暂时将它们排除在基准测试功能之外.

原创方法 -

def org_app(data1, batchSize, minimumLimit):
    resultArray = []
    for rowNr in  range(data1.shape[0]-batchSize+1):
        tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
        result = doTheMath(tmp_df, data2a, data2b)
        if (result >= minimumLimit):
            resultArray.append([rowNr , result]) 
    return resultArray
Run Code Online (Sandbox Code Playgroud)

时间和验证 -

In [2]: #Declare variables
   ...: batchSize = 2000
   ...: sampleSize = 50000
   ...: resultArray = []
   ...: minimumLimit = 490 #use 400 on the real sample data
   ...: 
   ...: #Create Random Sample Data
   ...: data1 = np.random.uniform(1, 100000, (sampleSize + batchSize, 4)).astype(np.float32)
   ...: data2b = np.random.uniform(0, 1, (batchSize)).astype(np.float32)
   ...: data2a = data2b + np.random.uniform(0, 1, (batchSize)).astype(np.float32)
   ...: 
   ...: # Make column copies
   ...: A = data1[:,0].copy()
   ...: B = data1[:,1].copy()
   ...: C = data1[:,2].copy()
   ...: D = data1[:,3].copy()
   ...: 
   ...: gpu_out1,gpu_out2 = gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
   ...: cpu_out1,cpu_out2 = np.array(org_app(data1, batchSize, minimumLimit)).T
   ...: print(np.allclose(gpu_out1, cpu_out1))
   ...: print(np.allclose(gpu_out2, cpu_out2))
   ...: 
True
False
Run Code Online (Sandbox Code Playgroud)

因此,CPU和GPU计数之间存在一些差异.让我们调查一下 -

In [7]: idx = np.flatnonzero(~np.isclose(gpu_out2, cpu_out2))

In [8]: idx
Out[8]: array([12776, 15208, 17620, 18326])

In [9]: gpu_out2[idx] - cpu_out2[idx]
Out[9]: array([-1., -1.,  1.,  1.])
Run Code Online (Sandbox Code Playgroud)

有四个不匹配计数的实例.这些都是最大的1.通过研究,我发现了一些有关这方面的信息.基本上,因为我们使用数学内在函数进行最大和最小计算,而我认为这些因素导致浮动pt表示中的最后一个二进制位与CPU对应位置不同.这被称为ULP错误,并已经详细discused herehere.

最后,把问题放在一边,让我们来看看最重要的一点,即表现 -

In [10]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 2.18 s per loop

In [11]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
10 loops, best of 3: 82.5 ms per loop

In [12]: 2180.0/82.5
Out[12]: 26.424242424242426
Run Code Online (Sandbox Code Playgroud)

让我们尝试更大的数据集.有了sampleSize = 500000,我们得到 -

In [14]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 23.2 s per loop

In [15]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
1 loops, best of 3: 821 ms per loop

In [16]: 23200.0/821
Out[16]: 28.25822168087698
Run Code Online (Sandbox Code Playgroud)

因此,加速保持不变27.

限制:

1)我们正在使用float32数字,因为GPU最适合这些.特别是在非服务器GPU上的双精度在性能方面并不受欢迎,因为你正在使用这样的GPU,我用float32进行了测试.

进一步改进:

1)我们可以用更快constant memory的饲料data2adata2b,而不是使用global memory.


MSe*_*ert 5

在开始调整目标(GPU)或使用其他任何功能(即,并行执行)之前,您可能需要考虑如何改善已经存在的代码。您使用了 -tag,所以我将使用它来改进代码:首先,我们对数组进行运算而不对矩阵进行运算:

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit
Run Code Online (Sandbox Code Playgroud)

每次调用时,doTheMath您都希望返回整数,但是您会使用很多数组并创建很多中间数组:

abcd = ((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ((C   - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
Run Code Online (Sandbox Code Playgroud)

这将在每个步骤中创建一个中间数组:

  • tmp1 = A-Cmin
  • tmp2 = tmp1 / dif
  • tmp3 = B - Cmin
  • tmp4 = tmp3 / dif
  • ...你的要旨。

但是,这是一个reduce函数(数组->整数),因此拥有大量中间数组是不必要的权重,只需计算“ fly”的值即可。

import numba as nb

@nb.njit
def doTheMathNumba(tmpData, data2a, data2b):
    Bmax = np.max(tmpData[:, 1])
    Cmin = np.min(tmpData[:, 2])
    diff = (Bmax - Cmin)
    idiff = 1 / diff
    sum_ = 0
    for i in range(tmpData.shape[0]):
        val = (tmpData[i, 0] + tmpData[i, 1] + tmpData[i, 2] + tmpData[i, 3]) / 4 * idiff - Cmin * idiff
        if val <= data2a[i] and val >= data2b[i]:
            sum_ += 1
    return sum_
Run Code Online (Sandbox Code Playgroud)

为了避免多次操作,我在这里做了其他操作:

(((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4
= ((A - Cmin + B - Cmin + C - Cmin + D - Cmin) / dif) / 4
= (A + B + C + D - 4 * Cmin) / (4 * dif)
= (A + B + C + D) / (4 * dif) - (Cmin / dif)
Run Code Online (Sandbox Code Playgroud)

实际上,这在我的计算机上将执行时间减少了将近10倍:

%timeit doTheMath(tmp_df, data2a, data2b)       # 1000 loops, best of 3: 446 µs per loop
%timeit doTheMathNumba(tmp_df, data2a, data2b)  # 10000 loops, best of 3: 59 µs per loop
Run Code Online (Sandbox Code Playgroud)

当然,还存在其他改进,例如使用滚动的最小/最大值来计算BmaxCmin,这至少会使一部分计算O(sampleSize)代替来运行O(samplesize * batchsize)。这也将有可能重新使用一些的(A + B + C + D) / (4 * dif) - (Cmin / dif)计算,因为,如果CminBmax没有为下一个样品这些值不改变不同。这样做比较复杂,因为比较有所不同。但是绝对有可能!看这里:

import time
import numpy as np
import numba as nb

@nb.njit
def doTheMathNumba(abcd, data2a, data2b, Bmax, Cmin):
    diff = (Bmax - Cmin)
    idiff = 1 / diff
    quarter_idiff = 0.25 * idiff
    sum_ = 0
    for i in range(abcd.shape[0]):
        val = abcd[i] * quarter_idiff - Cmin * idiff
        if val <= data2a[i] and val >= data2b[i]:
            sum_ += 1
    return sum_

@nb.njit
def doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, resultArray):
    found = 0
    for rowNr in range(data1.shape[0]):
        if(abcd[rowNr:rowNr + batchSize].shape[0] == batchSize):
            result = doTheMathNumba(abcd[rowNr:rowNr + batchSize], 
                                    data2a, data2b, Bmaxs[rowNr], Cmins[rowNr])
            if (result >= minimumLimit):
                resultArray[found, 0] = rowNr
                resultArray[found, 1] = result
                found += 1
    return resultArray[:found]

#Declare variables
batchSize = 2000
sampleSize = 50000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data 

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

from scipy import ndimage
t0 = time.time()
abcd = np.sum(data1, axis=1)
Bmaxs = ndimage.maximum_filter1d(data1[:, 1], 
                                 size=batchSize, 
                                 origin=-((batchSize-1)//2-1))  # correction for even shapes
Cmins = ndimage.minimum_filter1d(data1[:, 2], 
                                 size=batchSize, 
                                 origin=-((batchSize-1)//2-1))

result = np.zeros((sampleSize, 2), dtype=np.int64)
doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, result)
print('Runtime:', time.time() - t0)
Run Code Online (Sandbox Code Playgroud)

这给了我一个Runtime: 0.759593152999878(在numba编译函数之后!),而您的原始版本则拥有了Runtime: 24.68975639343262。现在我们快了30倍!

根据您的样本量,仍然可以,Runtime: 60.187848806381226但是还算不错,对吧?

即使我自己没有做过,表示有可能编写“用于CUDA GPU的Numba”,而且似乎并不复杂。