使用 numba 迭代多个 2d numpy 数组的最快方法

Nin*_*n17 7 python opencv numpy numba

当使用 numba 并访问多个 2d numpy 数组中的元素时,使用索引还是直接迭代数组更好,因为我发现两者的组合是最快的,这对我来说似乎违反直觉?或者还有其他更好的方法吗?

\n

对于上下文,我试图加快本文中光线追踪方法的实现https://iopscience.iop.org/article/10.1088/1361-6560/ac1f38/pdf

\n

我有一个函数,它获取传播前的强度和传播产生的位移图。所得强度则为由位移图逐像素位移的原始强度,其中子像素位移在各个相邻像素之间按比例共享。顺便说一句,这可以直接在 numpy 或另一个库中实现吗,因为我注意到它类似于 opencv 的重映射函数。

\n
import numpy as np\nfrom numba import njit\n\n@njit\ndef raytrace_range(intensity_0, d_y, d_x):\n    """\n\n    Args:\n\n        intensity_0 (2d numpy array): intensity before propagation\n        d_y (2d numpy array): Displacement along y in pixels\n        d_x (2d numpy array): Displacement along x in pixels\n\n    Returns:\n        intensity_z (2d numpy array): intensity after propagation \n\n    """\n    n_y, n_x = intensity_0.shape\n    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)\n    for i in range(n_x):\n        for j in range(n_y):\n            i_ij = intensity_0[i, j]\n            dx_ij=d_x[i,j]\n            dy_ij=d_y[i,j]\n\n\n            # Always the same from here down\n            if not dx_ij and not dy_ij:\n                intensity_z[i,j]+=i_ij\n                continue\n            i_new=i\n            j_new=j\n            #Calculating displacement bigger than a pixel\n            if np.abs(dx_ij)>1:\n                x = np.floor(dx_ij)\n                i_new=int(i+x)\n                dx_ij=dx_ij-x\n            if np.abs(dy_ij)>1:\n                y = np.floor(dy_ij)\n                j_new=int(j+y)\n                dy_ij=dy_ij-y\n            # Calculating sub-pixel displacement\n            if 0<=i_new and i_new<n_y and 0<=j_new and j_new<n_x:\n                intensity_z[i_new,j_new]+=i_ij*(1-np.abs(dx_ij))*(1-np.abs(dy_ij))\n                if i_new<n_y-1 and dx_ij>=0:\n                    if j_new<n_y-1 and dy_ij>=0:\n                        intensity_z[i_new+1, j_new]+=i_ij*dx_ij*(1-dy_ij)\n                        intensity_z[i_new+1, j_new+1]+=i_ij*dx_ij*dy_ij\n                        intensity_z[i_new, j_new+1]+=i_ij*(1-dx_ij)*dy_ij\n                    if j_new and dy_ij<0:\n                        intensity_z[i_new+1,j_new]+=i_ij*dx_ij*(1-np.abs(dy_ij))\n                        intensity_z[i_new+1,j_new-1]+=i_ij*dx_ij*np.abs(dy_ij)\n                        intensity_z[i_new,j_new-1]+=i_ij*(1-dx_ij)*np.abs(dy_ij)\n                if i_new and dx_ij<0:\n                    if j_new<n_x-1 and dy_ij>=0:\n                        intensity_z[i_new-1,j_new]+=i_ij*np.abs(dx_ij)*(1-dy_ij)\n                        intensity_z[i_new-1,j_new+1]+=i_ij*np.abs(dx_ij)*dy_ij\n                        intensity_z[i_new,j_new+1]+=i_ij*(1-np.abs(dx_ij))*dy_ij\n                    if j_new and dy_ij<0:\n                        intensity_z[i_new-1,j_new]+=i_ij*np.abs(dx_ij)*(1-np.abs(dy_ij))\n                        intensity_z[i_new-1,j_new-1]+=i_ij*dx_ij*dy_ij\n                        intensity_z[i_new,j_new-1]+=i_ij*(1-np.abs(dx_ij))*np.abs(dy_ij)\n    return intensity_z\n
Run Code Online (Sandbox Code Playgroud)\n

我已经尝试了其他一些方法,其中这是最快的(包括上面注释后的代码,# Always the same from here down我省略了该代码以使问题相对较短):

\n
@njit\ndef raytrace_enumerate(intensity_0, d_y, d_x):\n    n_y, n_x = intensity_0.shape\n    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)\n    for i, i_i in enumerate(intensity_0):\n        for j, i_ij in enumerate(i_i):\n            dx_ij=d_x[i,j]\n            dy_ij=d_y[i,j]\n
Run Code Online (Sandbox Code Playgroud)\n
@njit\ndef raytrace_npndenumerate(intensity_0, d_y, d_x):\n    n_y, n_x = intensity_0.shape\n    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)\n    for (i, j), i_ij  in np.ndenumerate(intensity_0):\n            dx_ij=d_x[i,j]\n            dy_ij=d_y[i,j]\n
Run Code Online (Sandbox Code Playgroud)\n
@njit\ndef raytrace_zip(intensity_0, d_y, d_x):\n    n_y, n_x = intensity_0.shape\n    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)\n    for i, (i_i, dy_i, dx_i) in enumerate(zip(intensity_0, d_y, d_x)):\n        for j, (i_ij, dy_ij, dx_ij) in enumerate(zip(i_i, dy_i, dx_i)):\n
Run Code Online (Sandbox Code Playgroud)\n
@njit\ndef raytrace_stack1(idydx):\n    n_y, _, n_x = idydx.shape\n    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)\n    for i, (i_i, dy_i, dx_i) in enumerate(idydx):\n        for j, (i_ij, dy_ij, dx_ij) in enumerate(zip(i_i, dy_i, dx_i)):\n
Run Code Online (Sandbox Code Playgroud)\n
@njit\ndef raytrace_stack2(idydx):\n    n_y, n_x, _ = idydx.shape\n    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)\n    for i, k in enumerate(idydx):\n        for j, (i_ij, dy_ij, dx_ij) in enumerate(k):\n
Run Code Online (Sandbox Code Playgroud)\n

补一下测试数据和时间:

\n
import timeit\nrng = np.random.default_rng()\nsize = (2010, 2000)\nmargin = 10\ntest_data = np.pad(10000*rng.random(size=size), margin)\ndx = np.pad(10*(rng.random(size=size)-0.5), margin)\ndy = np.pad(10*(rng.random(size=size)-0.5), margin)\n\n# Check results are the same\nL = [raytrace_range(test_data, dy, dx), raytrace_enumerate(test_data, dy, dx), raytrace_npndenumerate(test_data, dy, dx), raytrace_zip(test_data, dy, dx), raytrace_stack1(np.stack([test_data, dy, dx], axis=1)), raytrace_stack2(np.stack([test_data, dy, dx], axis=2))]\nprint((np.diff(np.vstack(L).reshape(len(L),-1),axis=0)==0).all())\n\n%timeit raytrace_range(test_data, dy, dx)\n%timeit raytrace_enumerate(test_data, dy, dx)\n%timeit raytrace_npndenumerate(test_data, dy, dx)\n%timeit raytrace_zip(test_data, dy, dx)\n%timeit raytrace_stack1(np.stack([test_data, dy, dx], axis=1)) #Note this would be the fastest if the arrays were pre-stacked\n%timeit raytrace_stack2(np.stack([test_data, dy, dx], axis=2))\n
Run Code Online (Sandbox Code Playgroud)\n

输出:

\n
True\n40.4 ms \xc2\xb1 233 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\n37.5 ms \xc2\xb1 117 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\n46.8 ms \xc2\xb1 112 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\n38.6 ms \xc2\xb1 243 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\n42 ms \xc2\xb1 234 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each) #Note this would be the fastest if the arrays were pre-stacked\n47.4 ms \xc2\xb1 203 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\n
Run Code Online (Sandbox Code Playgroud)\n

Jér*_*ard 2

一般来说,迭代数组的最快方法是基本的低级整数迭代器。这种模式会导致 Numba 中的转换次数最少,因此编译器应该能够很好地优化代码。由于未完美优化的间接代码转换,函数zip经常会增加额外的开销。enumerate

\n

这是一个基本示例:

\n
import numba as nb\n\n@nb.njit(\'(int_[::1],)\')\ndef test(arr):\n    s1 = s2 = 0\n    for i in range(arr.shape[0]):\n        s1 += i\n        s2 += arr[i]\n    return (s1, s2)\n\narr = np.arange(200_000)\ntest(arr)\n
Run Code Online (Sandbox Code Playgroud)\n

但是,当您同时读取/写入多个数组时(这就是您的情况),事情会变得更加复杂。事实上,Numpy 数组可以使用负索引进行索引,因此 Numba 需要每次都执行边界检查。与实际访问相比,此检查的成本很高,甚至可能破坏一些其他优化(例如矢量化)。

\n

因此,Numba 已经过优化,可以分析代码并检测不需要边界检查的情况,并防止在运行时添加昂贵的检查。上面的代码是这种情况,但您的raytrace_range函数中不是这种情况。enumerateenumerate+zip可以在很大程度上帮助消除边界检查,因为 Numba 可以轻松证明索引位于数组的边界内(理论上,它可以证明这一点,raytrace_range但遗憾的是当前的实现不够智能)。\n您基本上可以解决这个问题使用断言的问题。它不仅有利于优化,还能让你的代码更加健壮!

\n

此外,多维数组的索引有时并不能通过底层 JIT (LLVM-Lite) 完美优化。没有理由不对其进行优化,但编译器使用启发式方法来优化远非完美的代码(尽管平均而言相当不错)。您可以通过计算线条视图来提供帮助。不过,这通常会带来微小的改进。

\n

这是改进后的代码:

\n
@njit\ndef raytrace_range_opt(intensity_0, d_y, d_x):\n    n_y, n_x = intensity_0.shape\n    assert intensity_0.shape == d_y.shape\n    assert intensity_0.shape == d_x.shape\n\n    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)\n\n    for i in range(n_x):\n        row_intensity_0 = intensity_0[i, :]\n        row_d_x = d_x[i, :]\n        row_d_y = d_y[i, :]\n\n        for j in range(n_y):\n            assert j >= 0  # Crazy optimization (see later)\n\n            i_ij = row_intensity_0[j]\n            dx_ij = row_d_x[j]\n            dy_ij = row_d_y[j]\n\n            # Always the same from here down\n            if not dx_ij and not dy_ij:\n                row_intensity_0[j] += i_ij\n                continue\n\n            # Remaining code left unmodified\n
Run Code Online (Sandbox Code Playgroud)\n
\n

笔记

\n

请注意,我认为该函数的索引raytrace_enumerate伪造的:它应该是for i in range(n_y): for j in range(n_x):相反的,因为访问已完成intensity_0[i, j]并且您编写了n_y, n_x = intensity_0.shape。请注意,根据您的验证函数(这是可疑的),交换轴也会给出正确的结果。

\n

仅该assert j >= 0指令就可以实现 8% 的加速,这是疯狂的,因为如果整数迭代器j保证为正,则n_x数,因为它是一个形状,所以情况总是如此!这显然是 LLVM-Lite 无法优化的 Numba 优化失败(因为 LLVM-Lite 不知道什么是形状,而且它们也总是正数)。Numba 代码中明显缺失的假设导致额外的边界检查(三个数组中的每一个),这是相当昂贵的。

\n
\n

基准

\n

以下是我机器上的结果:

\n
raytrace_range:           47.8 ms \xc2\xb1 265 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\nraytrace_enumerate:       38.9 ms \xc2\xb1 208 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\nraytrace_npndenumerate:   54.1 ms \xc2\xb1 363 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\nraytrace_zip:             41 ms \xc2\xb1 657 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\nraytrace_stack1:          86.7 ms \xc2\xb1 268 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\nraytrace_stack2:          84 ms \xc2\xb1 432 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\n\nraytrace_range_opt:       38.6 ms \xc2\xb1 421 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\n
Run Code Online (Sandbox Code Playgroud)\n

正如你所看到的,raytrace_range_opt这是最快的实施这是我的机器上

\n