mds*_*slt 5 python search numpy numpy-ndarray
请假设以下 NumPy 数组:
A = array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
Run Code Online (Sandbox Code Playgroud)
我想找到该数组的N连续值等于零(包括零)的索引。
例如,假设N=3. 我们知道,A[2]=0虽然A[3]>0. 因此,数组的第二个元素A不具有三个连续的零值(包括)。数组A的理想结果如下所示:
B = array([False, False, False, False, True, True, True, True, False, False])
Run Code Online (Sandbox Code Playgroud)
我可以写一个循环作为这个问题的答案:
N = 3
A = np.array([1, 1, 0, 0, 0, 0, 0, 0, 0, 0])
B = np.zeros(len(A), dtype=bool)
for i in range(len(A)):
if (i + N <= len(A)) and (sum(A[i:i + N]) == 0):
B[i] = True
Run Code Online (Sandbox Code Playgroud)
由于矩阵A可能更大,并且我需要执行上述搜索过程数百万次,因此我需要尽可能最快的方法。您的建议是什么?
只要块大小N(下面称为m或size)足够大,并且输入具有大量非零值,这就是对原始方法的重大改进:
import numpy as np\nimport numba as nb\n\n\n@nb.njit\ndef find_blocks_while2_nb(arr, size, value):\n n = arr.size\n result = np.zeros(n, dtype=np.bool_)\n i = j = 0\n while i < n - size + 1:\n # last iteration found a block, only checking one value\n if j < i and arr[i + size - 1] == value:\n result[i] = True\n i += 1\n else:\n # search backward for index of last non-block value\n j = i + size - 1\n while j >= i and arr[j] == value:\n j -= 1\n if j < i:\n # block found, advance by 1\n result[i] = True\n i += 1\n else:\n # block not found, advance by last non-block value occurrence\n j += 1\n i = j\n return result\nRun Code Online (Sandbox Code Playgroud)\n本质上,它是原始方法的 Numba 加速版本,具有许多微观优化:
\n对于小块大小,更微观优化的方法可能会更快,如其他答案中介绍的那样,并在下面简要讨论。
\n查找相同值块开始处的索引的问题可以视为计算机科学中经典问题的特化:在数组中查找子数组(此处详细讨论)。
\n在这种情况下,子串(即块)由一系列相同的值组成,更准确地说是0s。
一般来说,这个问题的复杂性取决于数组 ( n) 的大小和块 (m或size) 的大小,以及它们中的实际值(显然)。
当然,假设n > m,因为很明显,如果n == m解决方案是微不足道的,并且当n < m数组无法容纳该块时。\n出于同样的原因,找不到上面的索引n - m。
虽然 na\xc3\xafve 实现本质上需要两个嵌套循环,一个用于字符串,一个用于子字符串(这会导致O(nm)计算复杂性),但可以利用该块由相同值组成的事实来在保证的线性时间O(n)和恒定的内存需求下运行算法O(1)。
然而,假设输入数组具有块值和非块值的混合分布,则可以运行大约。O(n / (m - \xce\xb5))时间并且仍然有O(1)内存消耗。该\xce\xb5 < m - 1参数与找到非块值的概率成正比。
这个想法是,当尝试确定块是否从位置开始时,如果数组在位置处i具有非块值,那么要评估的下一个位置是,因为该块不能容纳在和 之间。\n此外,如果搜索开始从后面(即在位置),则对于足够异构的输入,未检查的位置数量是最大的,因为不需要检查所有值。jj - i < mj + 1iji - mj - i
出于类似的原因,如果已知块从位置开始i,则检查块是否从位置开始时i + j,只需要检查和j < m之间的值,并且不需要再次检查和之间的值。i + mi + m + jii + m - 1
这是一个手动情况,生成器非常适合,因为可以利用其惰性求值属性仅根据需要执行计算位。\n此外,事先并不知道结果有多大(即可能会找到很多块或根本没有块)。
\n但是,如果这是较大计算的一部分,则预分配结果的大小并仅修改其索引的值可能会更快。\n不需要分配多个值。n - m + 1\n但是,为了兼容根据问题的要求,预先分配输入大小的数组。\n如果需要,可以轻松调整以下实现以避免浪费这些额外的内存。\n更准确地说,可以转换基于显式循环的解决方案通过替换result[i] = True为yield i(或类似的)来转换为生成器。\n同样,本地计算布尔数组的解决方案可以转换为生成器,如下所示:
for i in np.nonzero(result)[0]:\n yield i\nRun Code Online (Sandbox Code Playgroud)\n分析的其余部分将假设目标结果是布尔数组。
\n在纯 Python 中实现这一点的挑战之一是显式循环相对较慢。\n因此,可以方便地使用一些加速技术(例如 Numba 或 Cython)来减少计算时间。
\n或者,假设该算法本质上由两个循环组成(一个用于数组,一个用于子数组),人们可能会寻求对一个或两个循环进行矢量化。\n但是,请注意,这些方法必然会在最坏情况下O(nm)运行。
基本的 na\xc3\xafve 方法如下:
\ndef find_blocks_for2(arr, size, value):\n n = arr.size\n result = np.zeros(n, dtype=np.bool_)\n for i in range(n - size + 1):\n all_equal = True\n for j in range(i, i + size):\n if arr[j] != value:\n all_equal = False\n break # vectorized approaches may not have this\n if all_equal:\n result[i] = True\n return result\nRun Code Online (Sandbox Code Playgroud)\n这可以在多个方向上进行改进。\n一种方法源于内部循环实际上是无用的事实,因为它足以跟踪存在的连续“好”值的数量(这本质上与MichaelSzczesny相同) \ 的回答):
\nimport numpy as np\n\n\ndef find_blocks_for(arr, size, value):\n n = arr.size\n result = np.zeros(n, dtype=np.bool_)\n block_size = 0\n for i in range(n):\n if arr[i] == value:\n block_size += 1\n else:\n block_size = 0\n if block_size >= size:\n result[i - size + 1] = True\n return result\nRun Code Online (Sandbox Code Playgroud)\n请注意,这可以视为Rabin-Karp 算法的特化,其中哈希的作用由block_size计数器发挥。
另一种可能的方法是仅当最后一次迭代已经识别出块的存在并使其向后运行以最大化跳转到下一个可能的块时才跳过第二个循环:
\nimport numpy as np\n\n\ndef find_blocks_while2(arr, size, value):\n n = arr.size\n result = np.zeros(n, dtype=np.bool_)\n i = j = 0\n while i < n - size + 1:\n # last iteration found a block, only checking one value\n if j < i and arr[i + size - 1] == value:\n result[i] = True\n i += 1\n else:\n # search backward for index of last non-block value\n j = i + size - 1\n while j >= i and arr[j] == value:\n j -= 1\n if j < i:\n # block found, advance by 1\n result[i] = True\n i += 1\n else:\n # block not found, advance by last non-block value occurrence\n j += 1\n i = j\n return result\nRun Code Online (Sandbox Code Playgroud)\n请注意,这可以看作是Knuth-Morris-Pratt 算法的特化,无需计算块的每个值的偏移量(它们都是相同的)。
\n可以为上述方法设计许多变体,所有变体都具有相同的渐近行为,但运行时间略有不同,具体取决于实际输入。\n例如,可以更改条件的顺序以加速许多连续的情况找到块,代价是在其他地方运行更多指令,反之亦然。
\n所有这些在没有使用 Numba、Cython 或 Pypy 加速的情况下都运行得太慢。\nPypy 使用不同的解释器,我不会进一步讨论/基准测试它。
\nNumba 提供了最直接、最有效的加速之一(只需应用装饰器),例如:
\nimport numba as nb\n\n\nfind_blocks_while2_nb = nb.njit(find_blocks_while2)\nfind_blocks_while2_nb.__name__ = "find_blocks_while2_nb"\nRun Code Online (Sandbox Code Playgroud)\n也可以使用 Cython,例如(使用%load_ext CythonJupyter magic),但要充分利用编译,需要额外小心以确保瓶颈代码在没有 Python 交互的情况下运行:
%%cython -c-O3 -c-march=native -a\n#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True\n\n\nimport cython as cy\nimport numpy as np\n\n\ncdef _find_blocks_lin_cy(long[::1] arr, char[::1] result, Py_ssize_t n, Py_ssize_t size, long value):\n cdef Py_ssize_t block_size = 0\n for i in range(n):\n if arr[i] == value:\n block_size += 1\n else:\n block_size = 0\n if block_size >= size:\n result[i - size + 1] = True\n\n\ndef find_blocks_lin_cy(arr, size, value):\n n = arr.size\n result = np.zeros(n, dtype=np.bool_)\n _find_blocks_lin_cy(arr, result, n, size, value)\n return result\nRun Code Online (Sandbox Code Playgroud)\nNumba 还是 Cython 更快取决于代码可以触发多少优化,因此也取决于使用哪种编译器和 CPU 组合。
\n或者,可以对内部循环或外部循环(或两者)进行矢量化。\n当然,这对于在内部循环或外部循环中运行更多是最有效的。\n应该对最大的循环进行矢量化以获得最大加速。
\n对内部循环进行矢量化将导致与此接近的结果(与原始帖子类似,但边界较小,并sum()替换为np.all()也运动短路):
def find_blocks_for_all(arr, size, value):\n n = arr.size\n result = np.zeros(n, dtype=np.bool_)\n for i in range(n - size + 1):\n if np.all(arr[i:i + size] == value):\n result[i] = True\n return result\nRun Code Online (Sandbox Code Playgroud)\n向量化外循环将导致与此接近的结果(类似于DanielF\'s 答案中提出的内容,但整体代码更清晰,特别是避免不必要的函数调用):
\ndef find_blocks_for_and(arr, size, value):\n result = (arr == value)\n for i in range(1, size):\n result[:-1] &= result[1:]\n result[1 - size:] = False\n return result\nRun Code Online (Sandbox Code Playgroud)\n对两个循环进行向量化将导致与此接近的结果(类似于NaphatAmundsen\'s 答案中的一种方法):
\ndef find_blocks_strides(arr, size, value):\n n = arr.size\n strides = (arr.itemsize,) * 2\n block = np.full(size, value, dtype=arr.dtype)\n # note that `as_strided()` does not allocate extra memory\n # the downside is that buffer overflow will occur and\n # extra care must be taken not to use data outside the boundaries\n check = np.lib.stride_tricks.as_strided(arr, (n, size), strides)\n result = np.all(check == block, axis=-1)\n result[1 - size:] = False\n return result\nRun Code Online (Sandbox Code Playgroud)\n上面的代码可以进一步专门化,假设输入仅包含正值并且该块仅由零组成。\n这意味着将调用替换np.all()为np.sum()或 类似的。\n实际上有益的一种方法是使用完全矢量化方法,其中as_strided()可以通过计算与非零块的相关性来替换(类似于NaphatAmundsen 的答案中的一种方法):
def find_blocks_0_conv(arr, size):\n n = arr.size\n result = np.zeros_like(arr, dtype=np.bool_)\n block = np.ones(size, dtype=arr.dtype)\n result[:1 - size] = (np.correlate(arr, block, \'valid\') == 0)\n return result\nRun Code Online (Sandbox Code Playgroud)\n除了循环之外,运行时间还取决于代码如何转换为硬件指令以及它们最终的速度。\n例如,如果 NumPy 是在完全支持 SIMD 指令的情况下编译的,则很可能是矢量化 (对于许多输入组合,理论上最佳的方法会优于经过 na\xc3\xafvely 加速的理论上最佳的方法。\n对于以加速技术可以很好地利用 SIMD 指令的方式编写的方法,也会发生同样的情况\n但是,它们提供的额外速度将仅限于特定系统(即编译器和处理器的特定组合)。
\n一个这样的例子如下(本质上是对J\xc3\xa9r\xc3\xb4me Richard 的答案的完善,以避免全局变量并删除不必要的循环):
\ndef find_blocks_simd_nb_cached(arr, size, value=0, cache={}):\n assert size > 0\n\n if size not in cache:\n print(\'cached: \', list(cache.keys()))\n def find_blocks_simd_nb(arr, value):\n n = arr.size\n result = np.zeros(n, dtype=np.bool_)\n check = (arr == value)\n for i in range(n - size + 1):\n # using a `temp` variable avoid unnecessarily accessing the `check` array\n temp = check[i]\n for j in range(1, size):\n temp &= check[i + j]\n result[i] = temp\n return result\n\n cache[size] = nb.njit(find_blocks_simd_nb)\n return cache[size](arr, value)\nRun Code Online (Sandbox Code Playgroud)\n这里报告了一些基准。\n与往常一样,我们应该对它们持保留态度。
\n对于不同的输入大小、块大小和值,仅考虑最快的方法。\n_nb和_cy(除了find_blocks_for_cy()本质上是相同函数的非优化版本,没有上面的_nb或_cy后缀。
注意:
\nfind_blocks_for_all()对于足够大的值来说实际上是很快的sizefind_blocks_for_and()对于较小的值非常快size并且不需要 Numba 或 Cythonfind_blocks_for_nb()本质上与块大小无关并且整体性能非常好find_blocks_simd_nb()对于较小的值,速度非常快,size但对于较大的值,性能相对较差find_blocks_while2_nb()对于较大的值来说确实很出色size,对于较小的值来说速度相当快(此处提供完整的基准测试)。
\n对于较小的值N(在运行时变化不大),最佳解决方案是使用专门针对每个可能的特定值的SIMD 友好的 Numba 实现N。编译器可以生成非常高效的代码N <= ~30。对于较大的N值,@norok2 的解决方案开始变得更好,因为只要没有太多 0 项,它就可以跳过数组的许多项。当 0 的数量较多时,只要 ,该函数仍然具有竞争力N <= 64。当 时N > 64,请使用更适合的实现。
# Cache dictionary meant to store function for a given N\ncache = {}\n\ndef compute(A, N):\n assert N > 0\n if N not in cache:\n # Numba function to be compiled for a specific N\n def compute_specific(A):\n out = np.zeros(A.size, dtype=np.bool_)\n isZero = (A == 0).view(np.uint8)\n for i in range(isZero.size-N+1):\n allSame = isZero[i]\n for j in range(1, N):\n allSame &= isZero[i+j]\n out[i] = allSame\n for i in range(A.size-N+1, A.size):\n out[i] = 0\n return out\n cache[N] = nb.njit(compute_specific)\n return cache[N](A)\nRun Code Online (Sandbox Code Playgroud)\n编译器可以使用SIMD 指令(如 AVX-2)自动向量化此代码,以便在现代 x86 处理器上每个周期计算 32~64 个项目。较大的N值需要更多的 SIMD 指令,导致函数效率较低。相比之下,使用条件分支的实现往往很慢,因为在主流 x86 处理器上,错误预测的分支通常会花费大约 10~15 个周期,并且每次迭代一次只能计算一个标量项(即,假设慢约 2 个数量级)工作量是相同的)。
以下是我的机器(Intel Xeon Skylake)上的结果,其中包含大小为 100_000 的随机(32 位)整数数组。数组中大约有 50% 的值是 0。所有实现中都不包括 Numba 编译时间。
\nWith N = 3:\n - Initial solution: 115_000 \xc2\xb5s\n - Ali_Sh: 1_575 \xc2\xb5s\n - Naphat (stridetricks): 929 \xc2\xb5s\n - Naphat (correlational): 528 \xc2\xb5s\n - Michael Szczesny: 350 \xc2\xb5s\n - Norok2: 306 \xc2\xb5s\n - Franciska: 295 \xc2\xb5s\n - This code: 11 us <---\n\nWith N = 10:\n - Initial solution: 149_000 \xc2\xb5s\n - Ali_Sh: 1_592 \xc2\xb5s\n - Naphat (stridetricks): 1_139 \xc2\xb5s\n - Naphat (correlational): 831 \xc2\xb5s\n - Michael Szczesny: 489 \xc2\xb5s\n - Franciska: 444 \xc2\xb5s\n - Norok2: 98 \xc2\xb5s\n - This code: 14 \xc2\xb5s <---\n\nWith N = 30:\n - Ali_Sh: [FAIL]\n - Initial solution: 255_000 \xc2\xb5s\n - Franciska: 2_285 \xc2\xb5s\n - Naphat (correlational): 1_936 \xc2\xb5s\n - Naphat (stridetricks): 1_628 \xc2\xb5s\n - Michael Szczesny: 647 \xc2\xb5s\n - Norok2: 30 \xc2\xb5s\n - This code: 25 \xc2\xb5s <---\n\nWith N = 60:\n - Ali_Sh: [FAIL]\n - Initial solution: 414_000 \xc2\xb5s\n - Naphat (correlational): 3_814 \xc2\xb5s\n - Franciska: 3_242 \xc2\xb5s\n - Naphat (stridetricks): 3_048 \xc2\xb5s\n - Michael Szczesny: 816 \xc2\xb5s\n - This code: 50 \xc2\xb5s <---\n - Norok2: 14 \xc2\xb5s\nRun Code Online (Sandbox Code Playgroud)\n我们可以清楚地看到,除了 N 很大时(在这种情况下,Norok2 的代码更快),该函数是最快的函数(大幅)。
\n| 归档时间: |
|
| 查看次数: |
590 次 |
| 最近记录: |