在python中向量化6 for循环累积总和

Yet*_*eti 5 python for-loop numpy vectorization

数学问题是:

在此处输入图片说明

总和中的表达式实际上比上面的表达式复杂得多,但这是一个最小的工作示例,不会使事情复杂化。我已经使用6个嵌套的for循环在Python中编写了此代码,并且正如预期的那样,即使在Numba,Cython和朋友的帮助下,它的性能也很差(真正的形式效果很差,需要评估数百万次)。在这里,它是使用嵌套的for循环和累积和编写的:

import numpy as np

def func1(a,b,c,d):
    '''
    Minimal working example of multiple summation
    '''
    B = 0
    for ai in range(0,a):
        for bi in range(0,b):
            for ci in range(0,c):
                for di in range(0,d):
                    for ei in range(0,ai+bi):
                        for fi in range(0,ci+di):
                            B += (2)**(ei-fi-ai-ci-di+1)*(ei**2-2*(ei*fi)-7*di)*np.math.factorial(ei)


    return a, b, c, d, B
Run Code Online (Sandbox Code Playgroud)

表达控制与4个数字作为输入,并为func1(4,6,3,4)输出为B是21769947.844726562。

我到处寻找帮助,并找到了一些Stack帖子,其中包括一些示例:

NumPy中的外部产品:矢量化六个嵌套循环

向量化具有不同数组形状的Python / Numpy中的Triple for循环

Python向量化嵌套循环

我尝试使用从这些有用的帖子中学到的知识,但是经过多次尝试,我一直得出错误的答案。即使对其中一个和进行矢量化处理,也将为真正的问题带来巨大的性能提升,但是和范围不同的事实似乎使我无法接受。有人对如何进行此操作有任何提示吗?

jde*_*esa 3

编辑3:

\n\n

最终(我认为)版本,更干净,更快地融合了max9111 答案的想法想法。

\n\n
import numpy as np\nfrom numba import as nb\n\n@nb.njit()\ndef func1_jit(a, b, c, d):\n    # Precompute\n    exp_min = 5 - (a + b + c + d)\n    exp_max = b\n    exp = 2. ** np.arange(exp_min, exp_max + 1)\n    fact_e = np.empty((a + b - 2))\n    fact_e[0] = 1\n    for ei in range(1, len(fact_e)):\n        fact_e[ei] = ei * fact_e[ei - 1]\n    # Loops\n    B = 0\n    for ai in range(0, a):\n        for bi in range(0, b):\n            for ci in range(0, c):\n                for di in range(0, d):\n                    for ei in range(0, ai + bi):\n                        for fi in range(0, ci + di):\n                            B += exp[ei - fi - ai - ci - di + 1 - exp_min] * (ei * ei - 2 * (ei * fi) - 7 * di) * fact_e[ei]\n    return B\n
Run Code Online (Sandbox Code Playgroud)\n\n

这已经比之前的任何选项都快得多,但我们仍然没有利用多个 CPU。一种方法是在函数本身内部实现,例如并行化外循环。这会在每次调用创建线程时增加一些开销,因此对于较小的输入实际上会慢一些,但对于较大的值应该会明显更快:

\n\n
import numpy as np\nfrom numba import as nb\n\n@nb.njit(parallel=True)\ndef func1_par(a, b, c, d):\n    # Precompute\n    exp_min = 5 - (a + b + c + d)\n    exp_max = b\n    exp = 2. ** np.arange(exp_min, exp_max + 1)\n    fact_e = np.empty((a + b - 2))\n    fact_e[0] = 1\n    for ei in range(1, len(fact_e)):\n        fact_e[ei] = ei * fact_e[ei - 1]\n    # Loops\n    B = np.empty((a,))\n    for ai in nb.prange(0, a):\n        Bi = 0\n        for bi in range(0, b):\n            for ci in range(0, c):\n                for di in range(0, d):\n                    for ei in range(0, ai + bi):\n                        for fi in range(0, ci + di):\n                            Bi += exp[ei - fi - ai - ci - di + 1 - exp_min] * (ei * ei - 2 * (ei * fi) - 7 * di) * fact_e[ei]\n        B[ai] = Bi\n    return np.sum(B)\n
Run Code Online (Sandbox Code Playgroud)\n\n

或者,如果您想要评估函数的许多点,您也可以在该级别进行并行化。这里a_arr, b_arr,c_arrd_arr是要计算函数值的向量:

\n\n
from numba import as nb\n\n@nb.njit(parallel=True)\ndef func1_arr(a_arr, b_arr, c_arr, d_arr):\n    B_arr = np.empty((len(a_arr),))\n    for i in nb.prange(len(B_arr)):\n        B_arr[i] = func1_jit(a_arr[i], b_arr[i], c_arr[i], d_arr[i])\n    return B_arr\n
Run Code Online (Sandbox Code Playgroud)\n\n

最佳配置取决于您的输入、使用模式、硬件等,因此您可以结合不同的想法来适合您的情况。

\n\n
\n\n

编辑2:

\n\n

其实,忘记我之前说的话了。最好的办法是以更有效的方式 JIT 编译算法。首先计算昂贵的部分(我采用指数和阶乘),然后将其传递给编译后的循环函数:

\n\n
import numpy as np\nfrom numba import njit\n\ndef func1(a, b, c, d):\n    exp_min = 5 - (a + b + c + d)\n    exp_max = b\n    exp = 2. ** np.arange(exp_min, exp_max + 1)\n    ee = np.arange(a + b - 2)\n    fact_e = scipy.special.factorial(ee)\n    return func1_inner(a, b, c, d, exp_min, exp, fact_e)\n\n@njit()\ndef func1_inner(a, b, c, d, exp_min, exp, fact_e):\n    B = 0\n    for ai in range(0, a):\n        for bi in range(0, b):\n            for ci in range(0, c):\n                for di in range(0, d):\n                    for ei in range(0, ai + bi):\n                        for fi in range(0, ci + di):\n                            B += exp[ei - fi - ai - ci - di + 1 - exp_min] * (ei * ei - 2 * (ei * fi) - 7 * di) * fact_e[ei]\n    return B\n
Run Code Online (Sandbox Code Playgroud)\n\n

在我的实验中,这是迄今为止最快的选项,并且几乎不需要额外的内存(仅预先计算的值,其大小与输入成线性关系)。

\n\n
a, b, c, d = 4, 6, 3, 4\n# The original function\n%timeit func1_orig(a, b, c, d)\n# 2.07 ms \xc2\xb1 33.7 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 100 loops each)\n# The grid-evaluated function\n%timeit func1_grid(a, b, c, d)\n# 256 \xc2\xb5s \xc2\xb1 25 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10000 loops each)\n# The precompuation + JIT-compiled function\n%timeit func1_jit(a, b, c, d)\n# 19.6 \xc2\xb5s \xc2\xb1 3.25 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 100000 loops each)\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

好吧,总是可以选择对整个事情进行网格评估:

\n\n
import numpy as np\nimport scipy.special\n\ndef func1(a, b, c, d):\n    ai, bi, ci, di, ei, fi = np.ogrid[:a, :b, :c, :d, :a + b - 2, :c + d - 2]\n    # Compute\n    B = (2.) ** (ei - fi - ai - ci - di + 1) * (ei ** 2 - 2 * (ei * fi) - 7 * di) * scipy.special.factorial(ei)\n    # Mask out of range elements for last two inner loops\n    m = (ei < ai + bi) & (fi < ci + di)\n    return np.sum(B * m)\n\nprint(func1(4, 6, 3, 4))\n# 21769947.844726562\n
Run Code Online (Sandbox Code Playgroud)\n\n

我使用它是scipy.special.factorial因为np.factorial由于某种原因显然不适用于数组。

\n\n

显然,随着参数的增加,内存成本会增长得非常快。该代码实际上执行了不必要的计算,因为两个内部循环的迭代次数不同,因此(在此方法中)您必须使用最大的计算,然后删除不需要的计算。希望矢量化能够弥补这一点。一个小型 IPython 基准测试:

\n\n
a, b, c, d = 4, 6, 3, 4\n# func1_orig is the original loop-based version\n%timeit func1_orig(a, b, c, d)\n# 2.9 ms \xc2\xb1 110 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 100 loops each)\n# func1 here is the vectorized version\n%timeit func1(a, b, c, d)\n# 210 \xc2\xb5s \xc2\xb1 6.34 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 1000 loops each)\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

编辑:

\n\n

请注意,之前的方法也不是全有或全无的事情。您可以选择仅对某些循环进行网格评估。例如,两个最里面的循环可以像这样矢量化:

\n\n
def func1(a, b, c, d):\n    B = 0\n    e = np.arange(a + b - 2).reshape((-1, 1))\n    f = np.arange(c + d - 2)\n    for ai in range(0, a):\n        for bi in range(0, b):\n            ei = e[:ai + bi]\n            for ci in range(0, c):\n                for di in range(0, d):\n                    fi = f[:ci + di]\n                    B += np.sum((2.) ** (ei - fi - ai - ci - di + 1) * (ei ** 2 - 2 * (ei * fi) - 7 * di) * scipy.special.factorial(ei))\n    return B\n
Run Code Online (Sandbox Code Playgroud)\n\n

这仍然有循环,但它确实避免了额外的计算,并且内存需求要低得多。哪一个最好取决于我猜输入的大小。在我的测试中,使用原始值 (4, 6, 3, 4),这甚至比原始函数慢;另外,对于这种情况,为每个循环创建新数组似乎eifi在预先创建的数组的切片上操作要快。然而,如果将输入乘以 4 (14, 24, 12, 16),那么这比原始值(大约 x5)快得多,尽管仍然比完全矢量化的(大约 x3)慢。另一方面,我可以用这个(大约 5 分钟)计算输入缩放十倍(40,60,30,40)的值,但由于内存的原因不能用前一个来计算(我没有测试使用原始函数需要多长时间)。使用@numba.jit有一点帮助,尽管不是很大(nopython由于阶乘函数而无法使用)。您可以根据输入的大小尝试对更多或更少的循环进行矢量化。

\n