Nip*_*tra 5 python algorithm numpy vectorization cython
known_array:numpy数组; 仅由标量值组成;shape: (m, 1)
test_array:numpy数组; 仅由标量值组成;shape: (n, 1)
indices:numpy数组; shape: (n, 1); 对于每个值,test_array查找最接近的值的索引known_array
residual:numpy数组; shape: (n, 1); 对于每个值,test_array找到与最接近的值的差异known_array
In [17]: known_array = np.array([random.randint(-30,30) for i in range(5)])
In [18]: known_array
Out[18]: array([-24, -18, -13, -30, 29])
In [19]: test_array = np.array([random.randint(-10,10) for i in range(10)])
In [20]: test_array
Out[20]: array([-6, 4, -6, 4, 8, -4, 8, -6, 2, 8])
Run Code Online (Sandbox Code Playgroud)
def find_nearest(known_array, value):
idx = (np.abs(known_array - value)).argmin()
diff = known_array[idx] - value
return [idx, -diff]
In [22]: indices = np.zeros(len(test_array))
In [23]: residual = np.zeros(len(test_array))
In [24]: for i in range(len(test_array)):
....: [indices[i], residual[i]] = find_nearest(known_array, test_array[i])
....:
In [25]: indices
Out[25]: array([ 2., 2., 2., 2., 2., 2., 2., 2., 2., 2.])
In [26]: residual
Out[26]: array([ 7., 17., 7., 17., 21., 9., 21., 7., 15., 21.])
Run Code Online (Sandbox Code Playgroud)
加快这项任务的最佳方法是什么?Cython是一个选项,但是,我总是希望能够删除for循环并让代码保持纯粹的NumPy.
注意:请参阅Stack Overflow问题
我做了一些小基准来比较非矢量化和矢量化解决方案(接受的答案).
In [48]: [indices1, residual1] = find_nearest_vectorized(known_array, test_array)
In [53]: [indices2, residual2] = find_nearest_non_vectorized(known_array, test_array)
In [54]: indices1==indices2
Out[54]: array([ True, True, True, True, True, True, True, True, True, True], dtype=bool)
In [55]: residual1==residual2
Out[55]: array([ True, True, True, True, True, True, True, True, True, True], dtype=bool)
In [56]: %timeit [indices2, residual2] = find_nearest_non_vectorized(known_array, test_array)
10000 loops, best of 3: 173 µs per loop
In [57]: %timeit [indices1, residual1] = find_nearest_vectorized(known_array, test_array)
100000 loops, best of 3: 16.8 µs per loop
Run Code Online (Sandbox Code Playgroud)
大概加速10倍!
known_array 没有排序.
我在下面的@cyborg的答案中给出了基准测试.
案例1:如果known_array被分类
known_array = np.arange(0,1000)
test_array = np.random.randint(0, 100, 10000)
print('Speedups:')
base_time = time_f('base')
for func_name in ['diffs', 'searchsorted1', 'searchsorted2']:
print func_name + ' is x%.1f faster than base.' % (base_time / time_f(func_name))
assert np.allclose(base(known_array, test_array), eval(func_name+'(known_array, test_array)'))
Run Code Online (Sandbox Code Playgroud)
Speedups:
diffs is x0.4 faster than base.
searchsorted1 is x81.3 faster than base.
searchsorted2 is x107.6 faster than base.
Run Code Online (Sandbox Code Playgroud)
首先,对于大型数组,diffs方法实际上速度较慢,它也占用了大量的RAM,当我在实际数据上运行时,我的系统挂起了.
案例2:什么时候known_array没有排序; 这代表实际情况
known_array = np.random.randint(0,100,100)
test_array = np.random.randint(0, 100, 100)
Run Code Online (Sandbox Code Playgroud)
Speedups:
diffs is x8.9 faster than base.
AssertionError Traceback (most recent call last)
<ipython-input-26-3170078c217a> in <module>()
5 for func_name in ['diffs', 'searchsorted1', 'searchsorted2']:
6 print func_name + ' is x%.1f faster than base.' % (base_time / time_f(func_name))
----> 7 assert np.allclose(base(known_array, test_array), eval(func_name+'(known_array, test_array)'))
AssertionError:
searchsorted1 is x14.8 faster than base.
Run Code Online (Sandbox Code Playgroud)
我还必须评论该方法也应该是内存有效的.否则我的8 GB RAM是不够的.在基本情况下,它很容易就足够了.
如果数组很大,您应该使用searchsorted:
import numpy as np
np.random.seed(0)
known_array = np.random.rand(1000)
test_array = np.random.rand(400)
%%time
differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))
indices = np.abs(differences).argmin(axis=0)
residual = np.diagonal(differences[indices,])
Run Code Online (Sandbox Code Playgroud)
输出:
CPU times: user 11 ms, sys: 15 ms, total: 26 ms
Wall time: 26.4 ms
Run Code Online (Sandbox Code Playgroud)
searchsorted 版:
%%time
index_sorted = np.argsort(known_array)
known_array_sorted = known_array[index_sorted]
idx1 = np.searchsorted(known_array_sorted, test_array)
idx2 = np.clip(idx1 - 1, 0, len(known_array_sorted)-1)
diff1 = known_array_sorted[idx1] - test_array
diff2 = test_array - known_array_sorted[idx2]
indices2 = index_sorted[np.where(diff1 <= diff2, idx1, idx2)]
residual2 = test_array - known_array[indices]
Run Code Online (Sandbox Code Playgroud)
输出:
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 311 µs
Run Code Online (Sandbox Code Playgroud)
我们可以检查结果是否相同:
assert np.all(residual == residual2)
assert np.all(indices == indices2)
Run Code Online (Sandbox Code Playgroud)
TL; DR:使用numpy.searchsorted().
import inspect
from timeit import timeit
import numpy as np
known_array = np.arange(-10, 10)
test_array = np.random.randint(-10, 10, 1000)
number = 1000
def base(known_array, test_array):
def find_nearest(known_array, value):
idx = (np.abs(known_array - value)).argmin()
return idx
indices = np.zeros_like(test_array, dtype=known_array.dtype)
for i in range(len(test_array)):
indices[i] = find_nearest(known_array, test_array[i])
return indices
def diffs(known_array, test_array):
differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))
indices = np.abs(differences).argmin(axis=0)
return indices
def searchsorted1(known_array, test_array):
index_sorted = np.argsort(known_array)
known_array_sorted = known_array[index_sorted]
idx1 = np.searchsorted(known_array_sorted, test_array)
idx1[idx1 == len(known_array)] = len(known_array)-1
idx2 = np.clip(idx1 - 1, 0, len(known_array_sorted)-1)
diff1 = known_array_sorted[idx1] - test_array
diff2 = test_array - known_array_sorted[idx2]
indices2 = index_sorted[np.where(diff1 <= diff2, idx1, idx2)]
return indices2
def searchsorted2(known_array, test_array):
index_sorted = np.argsort(known_array)
known_array_sorted = known_array[index_sorted]
known_array_middles = known_array_sorted[1:] - np.diff(known_array_sorted.astype('f'))/2
idx1 = np.searchsorted(known_array_middles, test_array)
indices = index_sorted[idx1]
return indices
def time_f(func_name):
return timeit(func_name+"(known_array, test_array)",
'from __main__ import known_array, test_array, ' + func_name, number=number)
print('Speedups:')
base_time = time_f('base')
for func_name in ['diffs', 'searchsorted1', 'searchsorted2']:
print func_name + ' is x%.1f faster than base.' % (base_time / time_f(func_name))
Run Code Online (Sandbox Code Playgroud)
输出:
Speedups:
diffs is x29.9 faster than base.
searchsorted1 is x37.4 faster than base.
searchsorted2 is x64.3 faster than base.
Run Code Online (Sandbox Code Playgroud)
例如,您可以计算 on go 中的所有差异:
differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))
Run Code Online (Sandbox Code Playgroud)
并使用argmin花哨的索引来np.diagonal获得所需的索引和差异:
indices = np.abs(differences).argmin(axis=0)
residual = np.diagonal(differences[indices,])
Run Code Online (Sandbox Code Playgroud)
因此对于
>>> known_array = np.array([-24, -18, -13, -30, 29])
>>> test_array = np.array([-6, 4, -6, 4, 8, -4, 8, -6, 2, 8])
Run Code Online (Sandbox Code Playgroud)
一得到
>>> indices
array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
>>> residual
array([ 7, 17, 7, 17, 21, 9, 21, 7, 15, 21])
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1816 次 |
| 最近记录: |