在cython中创建小数组需要花费大量的时间

sta*_*icd 7 python arrays performance numpy cython

我正在为numpy编写一个新的随机数生成器,当我遇到这个非常奇怪的行为时,根据任意分布生成随机数:

这是test.pyx

#cython: boundscheck=False
#cython: wraparound=False
import numpy as np
cimport numpy as np
cimport cython

def BareBones(np.ndarray[double, ndim=1] a,np.ndarray[double, ndim=1] u,r):
    return u

def UntypedWithLoop(a,u,r):
    cdef int i,j=0
    for i in range(u.shape[0]):
        j+=i
    return u,j

def BSReplacement(np.ndarray[double, ndim=1] a, np.ndarray[double, ndim=1] u):
    cdef np.ndarray[np.int_t, ndim=1] r=np.empty(u.shape[0],dtype=int)
    cdef int i,j=0
    for i in range(u.shape[0]):
        j=i
    return r
Run Code Online (Sandbox Code Playgroud)

setup.py

from distutils.core import setup
from Cython.Build import cythonize
setup(name = "simple cython func",ext_modules = cythonize('test.pyx'),)
Run Code Online (Sandbox Code Playgroud)

分析代码

#!/usr/bin/python
from __future__ import division

import subprocess
import timeit

#Compile the cython modules before importing them
subprocess.call(['python', 'setup.py', 'build_ext', '--inplace'])

sstr="""
import test
import numpy
u=numpy.random.random(10)
a=numpy.random.random(10)
a=numpy.cumsum(a)
a/=a[-1]
r=numpy.empty(10,int)
"""

print "binary search: creates an array[N] and performs N binary searches to fill it:\n",timeit.timeit('numpy.searchsorted(a,u)',sstr)
print "Simple replacement for binary search:takes the same args as np.searchsorted and similarly returns a new array. this performs only one trivial operation per element:\n",timeit.timeit('test.BSReplacement(a,u)',sstr)

print "barebones function doing nothing:",timeit.timeit('test.BareBones(a,u,r)',sstr)
print "Untyped inputs and doing N iterations:",timeit.timeit('test.UntypedWithLoop(a,u,r)',sstr)
print "time for just np.empty()",timeit.timeit('numpy.empty(10,int)',sstr)
Run Code Online (Sandbox Code Playgroud)

二进制搜索实现采用执行时间的顺序len(u)*Log(len(a)).琐碎的cython函数按顺序len(u)运行.两者都返回len(u)的1D int数组.

然而,即使这样,没有计算的简单实现比numpy库中的完整二进制搜索花费更长的时间.(它是用C编写的:https://github.com/numpy/numpy/blob/202e78d607515e0390cffb1898e11807f117b36a/numpy/core/src/multiarray/item_selection.c参见PyArray_SearchSorted)

结果是:

binary search: creates an array[N] and performs N binary searches to fill it:
1.15157485008
Simple replacement for binary search:takes the same args as np.searchsorted and similarly returns a new array. this performs only one trivial operation per element:
3.69442796707
barebones function doing nothing: 0.87496304512
Untyped inputs and doing N iterations: 0.244267940521
time for just np.empty() 1.0983929634
Run Code Online (Sandbox Code Playgroud)

为什么np.empty()步骤花了这么多时间?我该怎么做才能获得一个我可以返回的空数组?

C函数执行此操作并运行一大堆健全性检查并在内部循环中使用更长的算法.(除了我自己的例子中的循环本身,我删除了所有逻辑)


更新

事实证明有两个不同的问题:

  1. 单独的np.empty(10)调用具有巨大的开销,并且需要花费尽可能多的时间来搜索排序以创建新数组并对其执行10次二进制搜索
  2. 只是声明缓冲区语法np.ndarray[...]也有一个巨大的开销,比接收无类型变量和迭代50次需要更多的时间.

结果为50次迭代:

binary search: 2.45336699486
Simple replacement:3.71126317978
barebones function doing nothing: 0.924916028976
Untyped inputs and doing N iterations: 0.316384077072
time for just np.empty() 1.04949498177
Run Code Online (Sandbox Code Playgroud)

Sau*_*tro 1

np.empty正如您已经看到的,在 Cython 函数内部创建会产生一些开销。在这里,您将看到一个有关如何创建空数组并将其传递给 Cython 模块以便填充正确值的示例:

n=10:

numpy.searchsorted: 1.30574745517
cython O(1): 3.28732016088
cython no array declaration 1.54710909596
Run Code Online (Sandbox Code Playgroud)

n=100:

numpy.searchsorted: 4.15200545373
cython O(1): 13.7273431067
cython no array declaration 11.4186086744
Run Code Online (Sandbox Code Playgroud)

正如您已经指出的,该numpy版本可以更好地扩展,因为它是O(len(u)*long(len(a))),并且这里的算法是O(len(u)*len(a))......

我还尝试使用 Memoryview,基本上np.ndarray[double, ndim=1]通过进行更改double[:],但在这种情况下第一个选项更快。

.pyx文件是:

from __future__ import division
import numpy as np
cimport numpy as np
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
def JustLoop(np.ndarray[double, ndim=1] a, np.ndarray[double, ndim=1] u,
             np.ndarray[int, ndim=1] r):
    cdef int i,j
    for j in range(u.shape[0]):
        if u[j] < a[0]:
            r[j] = 0
            continue

        if u[j] > a[a.shape[0]-1]:
            r[j] = a.shape[0]-1
            continue

        for i in range(1, a.shape[0]):
            if u[j] >= a[i-1] and u[j] < a[i]:
                r[j] = i
                break

@cython.boundscheck(False)
@cython.wraparound(False)
def WithArray(np.ndarray[double, ndim=1] a, np.ndarray[double, ndim=1] u):
    cdef np.ndarray[np.int_t, ndim=1] r=np.empty(u.shape[0],dtype=int)
    cdef int i,j
    for j in range(u.shape[0]):
        if u[j] < a[0]:
            r[j] = 0
            continue

        if u[j] > a[a.shape[0]-1]:
            r[j] = a.shape[0]-1
            continue

        for i in range(1, a.shape[0]):
            if u[j] >= a[i-1] and u[j] < a[i]:
                r[j] = i
                break
    return r
Run Code Online (Sandbox Code Playgroud)

.py文件:

import numpy
import subprocess
import timeit

#Compile the cython modules before importing them
subprocess.call(['python', 'setup.py', 'build_ext', '--inplace'])
from test import *

sstr="""
import test
import numpy
u=numpy.random.random(10)
a=numpy.random.random(10)
a=numpy.cumsum(a)
a/=a[-1]
a.sort()
r = numpy.empty(u.shape[0], dtype=int)
"""

print "numpy.searchsorted:",timeit.timeit('numpy.searchsorted(a,u)',sstr)
print "cython O(1):",timeit.timeit('test.WithArray(a,u)',sstr)
print "cython no array declaration",timeit.timeit('test.JustLoop(a,u,r)',sstr)
Run Code Online (Sandbox Code Playgroud)