NumPy:Vectorize为另一个数组中的每个元素在数组中查找最接近的值

Nip*_*tra 5 python algorithm numpy vectorization cython

输入

known_array:numpy数组; 仅由标量值组成;shape: (m, 1)

test_array:numpy数组; 仅由标量值组成;shape: (n, 1)

产量

indices:numpy数组; shape: (n, 1); 对于每个值,test_array查找最接近的值的索引known_array

residual:numpy数组; shape: (n, 1); 对于每个值,test_array找到与最接近的值的差异known_array

In [17]: known_array = np.array([random.randint(-30,30) for i in range(5)])

In [18]: known_array
Out[18]: array([-24, -18, -13, -30,  29])

In [19]: test_array = np.array([random.randint(-10,10) for i in range(10)])

In [20]: test_array
Out[20]: array([-6,  4, -6,  4,  8, -4,  8, -6,  2,  8])
Run Code Online (Sandbox Code Playgroud)

示例实现(未完全矢量化)

def find_nearest(known_array, value):
    idx = (np.abs(known_array - value)).argmin()
    diff = known_array[idx] - value
    return [idx, -diff]

In [22]: indices = np.zeros(len(test_array))

In [23]: residual = np.zeros(len(test_array))

In [24]: for i in range(len(test_array)):
   ....:     [indices[i], residual[i]] =  find_nearest(known_array, test_array[i])
   ....:     

In [25]: indices
Out[25]: array([ 2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.])

In [26]: residual
Out[26]: array([  7.,  17.,   7.,  17.,  21.,   9.,  21.,   7.,  15.,  21.])
Run Code Online (Sandbox Code Playgroud)

加快这项任务的最佳方法是什么?Cython是一个选项,但是,我总是希望能够删除for循环并让代码保持纯粹的NumPy.


注意:请参阅Stack Overflow问题

  1. Python/Numpy - 快速查找最接近某些值的数组中的索引
  2. 找到数值最接近的值的索引
  3. 在numpy数组中查找最接近的值
  4. 查找最近的值并返回Python中的数组索引
  5. 在Python中查找两个列表/数组中最近的项目

更新

我做了一些小基准来比较非矢量化和矢量化解决方案(接受的答案).

In [48]: [indices1, residual1] = find_nearest_vectorized(known_array, test_array)

In [53]: [indices2, residual2] = find_nearest_non_vectorized(known_array, test_array)

In [54]: indices1==indices2
Out[54]: array([ True,  True,  True,  True,  True,  True,  True,  True,  True,  True],   dtype=bool)

In [55]: residual1==residual2
Out[55]: array([ True,  True,  True,  True,  True,  True,  True,  True,  True,  True], dtype=bool)

In [56]: %timeit [indices2, residual2] = find_nearest_non_vectorized(known_array, test_array)
10000 loops, best of 3: 173 µs per loop

In [57]: %timeit [indices1, residual1] = find_nearest_vectorized(known_array, test_array)
100000 loops, best of 3: 16.8 µs per loop
Run Code Online (Sandbox Code Playgroud)

大概加速10倍!

澄清

known_array 没有排序.

我在下面的@cyborg的答案中给出了基准测试.

案例1:如果known_array被分类

known_array = np.arange(0,1000)
test_array = np.random.randint(0, 100, 10000)
print('Speedups:')
base_time = time_f('base')
for func_name in ['diffs', 'searchsorted1', 'searchsorted2']:
    print func_name + ' is x%.1f faster than base.' % (base_time / time_f(func_name))
    assert np.allclose(base(known_array, test_array), eval(func_name+'(known_array, test_array)'))
Run Code Online (Sandbox Code Playgroud)
Speedups:
diffs is x0.4 faster than base.
searchsorted1 is x81.3 faster than base.
searchsorted2 is x107.6 faster than base.
Run Code Online (Sandbox Code Playgroud)

首先,对于大型数组,diffs方法实际上速度较慢,它也占用了大量的RAM,当我在实际数据上运行时,我的系统挂起了.

案例2:什么时候known_array没有排序; 这代表实际情况

known_array = np.random.randint(0,100,100)
test_array = np.random.randint(0, 100, 100)
Run Code Online (Sandbox Code Playgroud)
Speedups:
diffs is x8.9 faster than base.
AssertionError                            Traceback (most recent call last)
<ipython-input-26-3170078c217a> in <module>()
      5 for func_name in ['diffs', 'searchsorted1', 'searchsorted2']:
      6     print func_name + ' is x%.1f faster than base.' % (base_time /  time_f(func_name))
----> 7     assert np.allclose(base(known_array, test_array),  eval(func_name+'(known_array, test_array)'))

AssertionError: 


searchsorted1 is x14.8 faster than base.
Run Code Online (Sandbox Code Playgroud)

我还必须评论该方法也应该是内存有效的.否则我的8 GB RAM是不够的.在基本情况下,它很容易就足够了.

HYR*_*YRY 6

如果数组很大,您应该使用searchsorted:

import numpy as np
np.random.seed(0)
known_array = np.random.rand(1000)
test_array = np.random.rand(400)

%%time
differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))
indices = np.abs(differences).argmin(axis=0)
residual = np.diagonal(differences[indices,])
Run Code Online (Sandbox Code Playgroud)

输出:

CPU times: user 11 ms, sys: 15 ms, total: 26 ms
Wall time: 26.4 ms
Run Code Online (Sandbox Code Playgroud)

searchsorted 版:

%%time

index_sorted = np.argsort(known_array)
known_array_sorted = known_array[index_sorted]

idx1 = np.searchsorted(known_array_sorted, test_array)
idx2 = np.clip(idx1 - 1, 0, len(known_array_sorted)-1)

diff1 = known_array_sorted[idx1] - test_array
diff2 = test_array - known_array_sorted[idx2]

indices2 = index_sorted[np.where(diff1 <= diff2, idx1, idx2)]
residual2 = test_array - known_array[indices]
Run Code Online (Sandbox Code Playgroud)

输出:

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 311 µs
Run Code Online (Sandbox Code Playgroud)

我们可以检查结果是否相同:

assert np.all(residual == residual2)
assert np.all(indices == indices2)
Run Code Online (Sandbox Code Playgroud)


cyb*_*org 5

TL; DR:使用numpy.searchsorted().

import inspect
from timeit import timeit
import numpy as np

known_array = np.arange(-10, 10)
test_array = np.random.randint(-10, 10, 1000)
number = 1000

def base(known_array, test_array):
    def find_nearest(known_array, value):
        idx = (np.abs(known_array - value)).argmin()
        return idx
    indices = np.zeros_like(test_array, dtype=known_array.dtype)
    for i in range(len(test_array)):
        indices[i] =  find_nearest(known_array, test_array[i])
    return indices

def diffs(known_array, test_array):
    differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))
    indices = np.abs(differences).argmin(axis=0)
    return indices

def searchsorted1(known_array, test_array):
    index_sorted = np.argsort(known_array)
    known_array_sorted = known_array[index_sorted]
    idx1 = np.searchsorted(known_array_sorted, test_array)
    idx1[idx1 == len(known_array)] = len(known_array)-1
    idx2 = np.clip(idx1 - 1, 0, len(known_array_sorted)-1)
    diff1 = known_array_sorted[idx1] - test_array
    diff2 = test_array - known_array_sorted[idx2]
    indices2 = index_sorted[np.where(diff1 <= diff2, idx1, idx2)]
    return indices2

def searchsorted2(known_array, test_array):
    index_sorted = np.argsort(known_array)
    known_array_sorted = known_array[index_sorted]
    known_array_middles = known_array_sorted[1:] - np.diff(known_array_sorted.astype('f'))/2
    idx1 = np.searchsorted(known_array_middles, test_array)
    indices = index_sorted[idx1]
    return indices

def time_f(func_name):
    return timeit(func_name+"(known_array, test_array)",
        'from __main__ import known_array, test_array, ' + func_name, number=number)

print('Speedups:')
base_time = time_f('base')
for func_name in ['diffs', 'searchsorted1', 'searchsorted2']:
    print func_name + ' is x%.1f faster than base.' % (base_time / time_f(func_name))
Run Code Online (Sandbox Code Playgroud)

输出:

Speedups:
diffs is x29.9 faster than base.
searchsorted1 is x37.4 faster than base.
searchsorted2 is x64.3 faster than base.
Run Code Online (Sandbox Code Playgroud)


alk*_*lko 4

例如,您可以计算 on go 中的所有差异:

differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))
Run Code Online (Sandbox Code Playgroud)

并使用argmin花哨的索引来np.diagonal获得所需的索引和差异:

indices = np.abs(differences).argmin(axis=0)
residual = np.diagonal(differences[indices,])
Run Code Online (Sandbox Code Playgroud)

因此对于

>>> known_array = np.array([-24, -18, -13, -30,  29])
>>> test_array = np.array([-6,  4, -6,  4,  8, -4,  8, -6,  2,  8])
Run Code Online (Sandbox Code Playgroud)

一得到

>>> indices
array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
>>> residual
array([ 7, 17,  7, 17, 21,  9, 21,  7, 15, 21])
Run Code Online (Sandbox Code Playgroud)