numpy 数组上的高效循环

ink*_*ink 6 python arrays optimization loops numpy

已经问过这个问题的版本,但我没有找到满意的答案。

问题:给定一个大的 numpy 向量,找到重复的向量元素的索引(其变体可以与容差进行比较)。

所以问题是 ~O(N^2) 和内存限制(至少从当前算法的角度来看)。我想知道为什么我尝试过的 Python 比等效的 C 代码慢 100 倍或更多。

import numpy as np
N = 10000
vect = np.arange(float(N))
vect[N/2] = 1
vect[N/4] = 1
dupl = []
print("init done")
counter = 0
for i in range(N):
    for j in range(i+1, N):
        if vect[i] == vect[j]:
            dupl.append(j)
            counter += 1

print("counter =", counter)
print(dupl)
# For simplicity, this code ignores repeated indices 
# which can be trimmed later. Ref output is
# counter = 3
# [2500, 5000, 5000]
Run Code Online (Sandbox Code Playgroud)

我尝试使用 numpy 迭代器,但它们更糟(~ x4-5) http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html

使用 N=10,000 我在 C 中获得 0.1 秒,在 Python 中获得 12 秒(上面的代码),在 Python 中使用 np.nditer 获得 40 秒,在 Python 中使用 np.ndindex 获得 50 秒。我把它推到 N=160,000,时间按预期缩放为 N^2。

ink*_*ink 9

由于答案已经停止出现并且没有一个是完全令人满意的,因此我发布了我自己的解决方案作为记录。

据我了解,在这种情况下,是赋值使 Python 变慢,而不是我最初认为的嵌套循环。使用库或编译的代码消除了分配的需要,并且性能显着提高。

from __future__ import print_function
import numpy as np
from numba import jit

N = 10000
vect = np.arange(N, dtype=np.float32)

vect[N/2] = 1
vect[N/4] = 1
dupl = np.zeros(N, dtype=np.int32)

print("init done")
# uncomment to enable compiled function
#@jit
def duplicates(i, counter, dupl, vect):
    eps = 0.01
    ns = len(vect)
    for j in range(i+1, ns):
        # replace if to use approx comparison
        #if abs(vect[i] - vect[j]) < eps:
        if vect[i] == vect[j]:
            dupl[counter] = j
            counter += 1
    return counter

counter = 0
for i in xrange(N):
    counter = duplicates(i, counter, dupl, vect)

print("counter =", counter)
print(dupl[0:counter])
Run Code Online (Sandbox Code Playgroud)

测试

# no jit
$ time python array-test-numba.py
init done
counter = 3
[2500 5000 5000]

elapsed 10.135 s

# with jit
$ time python array-test-numba.py
init done
counter = 3
[2500 5000 5000]

elapsed 0.480 s
Run Code Online (Sandbox Code Playgroud)

编译版本(@jit 未注释)的性能接近 C 代码性能 ~0.1 - 0.2 秒。也许消除最后一个循环可以进一步提高性能。当使用 eps 进行近似比较时,性能差异甚至更大,而编译版本的差异很小。

# no jit
$ time python array-test-numba.py
init done
counter = 3
[2500 5000 5000]

elapsed 109.218 s

# with jit
$ time python array-test-numba.py
init done
counter = 3
[2500 5000 5000]

elapsed 0.506 s
Run Code Online (Sandbox Code Playgroud)

这大约是 200 倍的差异。在实际代码中,我必须将两个循环放入函数中,并使用具有变量类型的函数模板,因此它有点复杂,但不是很复杂。


Div*_*kar 1

方法#1

您可以使用 来模拟矢量化解决方案的迭代器依赖条件triangular matrix。这是基于this post涉及 的乘法处理iterator dependency。为了执行每个元素相对于其所有元素的元素相等vect,我们可以使用NumPy broadcasting. 最后,我们可以使用它np.count_nonzero来获取计数,因为它对于布尔数组的求和目的应该非常有效。

所以,我们会有一个像这样的解决方案 -

mask = np.triu(vect[:,None] == vect,1)
counter = np.count_nonzero(mask)
dupl = np.where(mask)[1]
Run Code Online (Sandbox Code Playgroud)

如果您只关心计数counter,我们还可以采用下面列出的两种方法。

方法#2

我们可以避免使用三角矩阵,而简单地获得整个计数,然后减去对角元素的贡献,并通过将剩余计数减半来仅考虑上三角区域或下三角区域之一,因为任一区域的贡献都是相同的。

所以,我们会有一个修改后的解决方案,如下所示 -

counter = (np.count_nonzero(vect[:,None] == vect) - vect.size)//2
Run Code Online (Sandbox Code Playgroud)

方法#3

这是一种完全不同的方法,它利用每个独特元素的计数对最终总数起到累积贡献的事实。

因此,考虑到这个想法,我们将有第三种方法,如下所示 -

count = np.bincount(vect) # OR np.unique(vect,return_counts=True)[1]
idx = count[count>1]
id_arr = np.ones(idx.sum(),dtype=int)
id_arr[0] = 0
id_arr[idx[:-1].cumsum()] = -idx[:-1]+1
counter = np.sum(id_arr.cumsum())
Run Code Online (Sandbox Code Playgroud)