np.nonzero的Python/Cython/Numpy优化

Dbr*_*cks 5 python numpy cython

我有一段代码,我正在尝试优化.大多数代码执行时间cdef np.ndarray index = np.argwhere(array==1) 取决于Where数组是numpy是512x512,512 numpy数组的零和1.有关加速这个的任何想法?使用Python 2.7,Numpy 1.8.1

球形度函数

def sphericity(self,array):

    #Pass an mask array (1's are marked, 0's ignored)
    cdef np.ndarray index = np.argwhere(array==1)
    cdef int xSize,ySize,zSize
    xSize,ySize,zSize=array.shape

    cdef int sa,vol,voxelIndex,x,y,z,neighbors,xDiff,yDiff,zDiff,x1,y1,z1
    cdef float onethird,twothirds,sp
    sa=vol=0 #keep running tally of volume and surface area
    #cdef int nonZeroCount = (array != 0).sum() #Replaces np.count_nonzero(array) for speed
    for voxelIndex in range(np.count_nonzero(array)):
    #for voxelIndex in range(nonZeroCount):
        x=index[voxelIndex,0]
        y=index[voxelIndex,1]
        z=index[voxelIndex,2]
        #print x,y,z,array[x,y,z]
        neighbors=0
        vol+=1

        for xDiff in [-1,0,1]:
            for yDiff in [-1,0,1]:
                for zDiff in [-1,0,1]:
                    if abs(xDiff)+abs(yDiff)+abs(zDiff)==1:
                        x1=x+xDiff
                        y1=y+yDiff
                        z1=z+zDiff
                        if x1>=0 and y1>=0 and z1>=0 and x1<xSize and y1<ySize and z1<zSize:
                            #print '-',x1,y1,z1,array[x1,y1,z1]
                            if array[x1,y1,z1]:
                                #print '-',x1,y1,z1,array[x1,y1,z1]
                                neighbors+=1

        #print 'had this many neighbors',neighbors
        sa+=(6-neighbors)

    onethird=float(1)/float(3)
    twothirds=float(2)/float(3)
    sph = ((np.pi**onethird)*((6*vol)**twothirds)) / sa
    #print 'sphericity',sphericity
    return sph
Run Code Online (Sandbox Code Playgroud)

分析测试

#Imports
import pstats, cProfile
import numpy as np
import pyximport
pyximport.install(setup_args={"script_args":["--compiler=mingw32"], "include_dirs":np.get_include()}, reload_support=True) #Generate cython version

#Create fake array to calc sphericity
fakeArray=np.zeros((512,512,512))
fakeArray[200:300,200:300,200:300]=1

#Profiling stuff
cProfile.runctx("sphericity(fakeArray)", globals(), locals(), "Profile.prof")
s = pstats.Stats("Profile.prof")
s.strip_dirs().sort_stats("time").print_stats()
Run Code Online (Sandbox Code Playgroud)

分析输出

Mon Oct 06 11:49:57 2014    Profile.prof

         12 function calls in 4.373 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    3.045    3.045    4.373    4.373 <string>:1(<module>)
        1    1.025    1.025    1.025    1.025 {method 'nonzero' of 'numpy.ndarray' objects}
        2    0.302    0.151    0.302    0.151 {numpy.core.multiarray.array}
        1    0.001    0.001    1.328    1.328 numeric.py:731(argwhere)
        1    0.000    0.000    0.302    0.302 fromnumeric.py:492(transpose)
        1    0.000    0.000    0.302    0.302 fromnumeric.py:38(_wrapit)
        1    0.000    0.000    0.000    0.000 {method 'transpose' of 'numpy.ndarray' objects}
        1    0.000    0.000    0.302    0.302 numeric.py:392(asarray)
        1    0.000    0.000    0.000    0.000 numeric.py:462(asanyarray)
        1    0.000    0.000    0.000    0.000 {getattr}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
Run Code Online (Sandbox Code Playgroud)

Bla*_*lsh 7

Jaime可能给出了一个很好的答案,但我会评论改进Cython代码并添加性能比较.

首先,您应该使用'annotate'功能cython -a filename.pyx,这将生成一个HTML文件.在浏览器中加载它并突出显示带有黄橙色的"慢"行,这表示可以进行哪些改进.

注释会立即显示两个容易修复的内容:

将习语转换成cython理解的东西

首先,这些线路很慢:

        for xDiff in [-1,0,1]:
            for yDiff in [-1,0,1]:
                for zDiff in [-1,0,1]:
Run Code Online (Sandbox Code Playgroud)

原因是Cython不知道如何将列表迭代转换为干净的c代码.需要将其转换为Cython可以优化的等效代码,即"范围内"形式:

        for xDiff in range(-1, 2):
            for yDiff in range(-1, 2):
                for zDiff in range(-1, 2):
Run Code Online (Sandbox Code Playgroud)

键入数组以进行快速索引

接下来是这条线很慢:

                            if array[x1,y1,z1]:
Run Code Online (Sandbox Code Playgroud)

原因是array没有给出类型.因此它使用python级索引而不是c级索引.要解决这个问题,你需要给数组一个类型,这可以用这种方式完成:

def sphericity(np.ndarray[np.uint8_t, ndim=3] array):
Run Code Online (Sandbox Code Playgroud)

假设数组的类型是'uint8',用适当的类型替换(注意:Cython不支持'np.bool'类型,因此我使用'uint8')

您也可以使用内存视图,不能在内存视图上使用numpy函数,但可以在数组上创建视图,然后索引视图而不是数组:

    cdef np.uint8_t array_view [:, :, :] = array
    ...
                                    if array_view[x1,y1,z1]:
Run Code Online (Sandbox Code Playgroud)

内存视图可能会稍微快一点,并在数组(python级别调用)和视图(c级别调用)之间进行明确划分.如果你没有使用numpy函数,你可以使用内存视图没有问题.

重写代码以避免多次遍历数组

剩下的是计算index并且nonZeroCount都很慢,这是出于各种原因,但主要仅涉及数据的绝对大小(基本上,迭代超过512*512*512元素只需要时间!)一般来说Numpy可以做的任何事情,优化的Cython可以做得更快(通常快2-10倍) - numpy只是为你节省了很多重新发明的轮子和大量的打字,让你在更高的层次上思考(如果你不是ac程序员,你可能不会能够很好地优化cython).但在这种情况下,它很容易,你可以消除indexnonZeroCount所有相关的代码,只是这样做:

    for x in range(0, xSize):
        for y in range(0, ySize):
            for z in range(0, zSize):
                if array[x,y,z] == 0:
                    continue
                ... 
Run Code Online (Sandbox Code Playgroud)

这非常快,因为c(干净的Cython编译完美无瑕)可以毫不费力地每秒执行数十亿次操作.通过消除indexnonZeroCount步骤,您基本上可以在整个阵列上保存两次完整的迭代,即使在最大速度下也需要至少约0.1秒.更重要的是CPU缓存,整个阵列是128mb,比cpu缓存大得多,因此一次性完成所有操作可以更好地利用cpu缓存(如果阵列完全适合CPU,多次传递就没那么重要了高速缓存).

优化版本

以下是我的优化版本的完整代码:

#cython: boundscheck=False, nonecheck=False, wraparound=False
import numpy as np
cimport numpy as np

def sphericity2(np.uint8_t [:, :, :] array):

    #Pass an mask array (1's are marked, 0's ignored)
    cdef int xSize,ySize,zSize
    xSize=array.shape[0]
    ySize=array.shape[1]
    zSize=array.shape[2]

    cdef int sa,vol,x,y,z,neighbors,xDiff,yDiff,zDiff,x1,y1,z1
    cdef float onethird,twothirds,sp
    sa=vol=0 #keep running tally of volume and surface area

    for x in range(0, xSize):
        for y in range(0, ySize):
            for z in range(0, zSize):
                if array[x,y,z] == 0:
                    continue

                neighbors=0
                vol+=1

                for xDiff in range(-1, 2):
                    for yDiff in range(-1, 2):
                        for zDiff in range(-1, 2):
                            if abs(xDiff)+abs(yDiff)+abs(zDiff)==1:
                                x1=x+xDiff
                                y1=y+yDiff
                                z1=z+zDiff
                                if x1>=0 and y1>=0 and z1>=0 and x1<xSize and y1<ySize and z1<zSize:
                                    #print '-',x1,y1,z1,array[x1,y1,z1]
                                    if array[x1,y1,z1]:
                                        #print '-',x1,y1,z1,array[x1,y1,z1]
                                        neighbors+=1

                #print 'had this many neighbors',neighbors
                sa+=(6-neighbors)

    onethird=float(1)/float(3)
    twothirds=float(2)/float(3)
    sph = ((np.pi**onethird)*((6*vol)**twothirds)) / sa
    #print 'sphericity',sphericity
    return sph
Run Code Online (Sandbox Code Playgroud)

球形执行时间比较:

Original         : 2.123s
Jaime's          : 1.819s
Optimized Cython : 0.136s
@ moarningsun    : 0.090s

在所有Cython解决方案运行速度超过15倍的情况下,使用展开的内部循环(请参阅注释),它运行速度提高了23倍.


Jai*_*ime 6

你可以从vanilla numpy获得你的代码所做的大部分工作,而不需要Cython.中心的事情是获得一种有效的计算邻居的方法,这可以and通过从输入数组获得的掩码片来完成.总而言之,我认为以下内容与您的代码相同,但重复次数少得多:

def sphericity(arr):
    mask = arr != 0
    vol = np.count_nonzero(mask)
    counts = np.zeros_like(arr, dtype=np.intp)
    for dim, size in enumerate(arr.shape):
        slc = (slice(None),) * dim
        axis_mask = (mask[slc + (slice(None, -1),)] &
                     mask[slc + (slice(1, None),)])
        counts[slc + (slice(None, -1),)] += axis_mask
        counts[slc + (slice(1, None),)] += axis_mask
    sa = np.sum(6 - counts[counts != 0])

    return np.pi**(1./3.)*(6*vol)**(2./3.) / sa
Run Code Online (Sandbox Code Playgroud)