ink*_*ink 6 python arrays optimization loops numpy
已经问过这个问题的版本,但我没有找到满意的答案。
问题:给定一个大的 numpy 向量,找到重复的向量元素的索引(其变体可以与容差进行比较)。
所以问题是 ~O(N^2) 和内存限制(至少从当前算法的角度来看)。我想知道为什么我尝试过的 Python 比等效的 C 代码慢 100 倍或更多。
import numpy as np
N = 10000
vect = np.arange(float(N))
vect[N/2] = 1
vect[N/4] = 1
dupl = []
print("init done")
counter = 0
for i in range(N):
for j in range(i+1, N):
if vect[i] == vect[j]:
dupl.append(j)
counter += 1
print("counter =", counter)
print(dupl)
# For simplicity, this code ignores repeated indices
# which can be trimmed later. Ref output is
# counter = 3
# [2500, 5000, 5000]
Run Code Online (Sandbox Code Playgroud)
我尝试使用 numpy 迭代器,但它们更糟(~ x4-5) http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html
使用 N=10,000 我在 C 中获得 0.1 秒,在 Python 中获得 12 秒(上面的代码),在 Python 中使用 np.nditer 获得 40 秒,在 Python 中使用 np.ndindex 获得 50 秒。我把它推到 N=160,000,时间按预期缩放为 N^2。
由于答案已经停止出现并且没有一个是完全令人满意的,因此我发布了我自己的解决方案作为记录。
据我了解,在这种情况下,是赋值使 Python 变慢,而不是我最初认为的嵌套循环。使用库或编译的代码消除了分配的需要,并且性能显着提高。
from __future__ import print_function
import numpy as np
from numba import jit
N = 10000
vect = np.arange(N, dtype=np.float32)
vect[N/2] = 1
vect[N/4] = 1
dupl = np.zeros(N, dtype=np.int32)
print("init done")
# uncomment to enable compiled function
#@jit
def duplicates(i, counter, dupl, vect):
eps = 0.01
ns = len(vect)
for j in range(i+1, ns):
# replace if to use approx comparison
#if abs(vect[i] - vect[j]) < eps:
if vect[i] == vect[j]:
dupl[counter] = j
counter += 1
return counter
counter = 0
for i in xrange(N):
counter = duplicates(i, counter, dupl, vect)
print("counter =", counter)
print(dupl[0:counter])
Run Code Online (Sandbox Code Playgroud)
测试
# no jit
$ time python array-test-numba.py
init done
counter = 3
[2500 5000 5000]
elapsed 10.135 s
# with jit
$ time python array-test-numba.py
init done
counter = 3
[2500 5000 5000]
elapsed 0.480 s
Run Code Online (Sandbox Code Playgroud)
编译版本(@jit 未注释)的性能接近 C 代码性能 ~0.1 - 0.2 秒。也许消除最后一个循环可以进一步提高性能。当使用 eps 进行近似比较时,性能差异甚至更大,而编译版本的差异很小。
# no jit
$ time python array-test-numba.py
init done
counter = 3
[2500 5000 5000]
elapsed 109.218 s
# with jit
$ time python array-test-numba.py
init done
counter = 3
[2500 5000 5000]
elapsed 0.506 s
Run Code Online (Sandbox Code Playgroud)
这大约是 200 倍的差异。在实际代码中,我必须将两个循环放入函数中,并使用具有变量类型的函数模板,因此它有点复杂,但不是很复杂。
方法#1
您可以使用 来模拟矢量化解决方案的迭代器依赖条件triangular matrix。这是基于this post涉及 的乘法处理iterator dependency。为了执行每个元素相对于其所有元素的元素相等vect,我们可以使用NumPy broadcasting. 最后,我们可以使用它np.count_nonzero来获取计数,因为它对于布尔数组的求和目的应该非常有效。
所以,我们会有一个像这样的解决方案 -
mask = np.triu(vect[:,None] == vect,1)
counter = np.count_nonzero(mask)
dupl = np.where(mask)[1]
Run Code Online (Sandbox Code Playgroud)
如果您只关心计数counter,我们还可以采用下面列出的两种方法。
方法#2
我们可以避免使用三角矩阵,而简单地获得整个计数,然后减去对角元素的贡献,并通过将剩余计数减半来仅考虑上三角区域或下三角区域之一,因为任一区域的贡献都是相同的。
所以,我们会有一个修改后的解决方案,如下所示 -
counter = (np.count_nonzero(vect[:,None] == vect) - vect.size)//2
Run Code Online (Sandbox Code Playgroud)
方法#3
这是一种完全不同的方法,它利用每个独特元素的计数对最终总数起到累积贡献的事实。
因此,考虑到这个想法,我们将有第三种方法,如下所示 -
count = np.bincount(vect) # OR np.unique(vect,return_counts=True)[1]
idx = count[count>1]
id_arr = np.ones(idx.sum(),dtype=int)
id_arr[0] = 0
id_arr[idx[:-1].cumsum()] = -idx[:-1]+1
counter = np.sum(id_arr.cumsum())
Run Code Online (Sandbox Code Playgroud)