如何在 Python 中优化和加速矩阵乘法

Zub*_*aki 5 python numpy matrix-multiplication

根据梯度方程,矩阵乘法由下式给出 在此输入图像描述

@和都*需要的地方。如果读者有兴趣的话,这是代码:

# parameters
beta     =  0.98 
alpha    =  0.03
delta    =  0.1
T        =  1000
loop     =  1
dif      =  1
tol      =  1e-8

kss      =  ((1 / beta - (1 - delta)) / alpha)**(1 / (alpha - 1))
k        =  np.linspace(0.5 * kss, 1.8 * kss, T)

k_reshaped   =  k.reshape(-1, 1)
c            =  k_reshaped ** alpha + (1 - delta) * k_reshaped - k
c[c<0]       =  1e-11
c            =  np.log(c)
beta_square  =  beta**2

# multiplication
I   =  np.identity(T)
E   =  np.ones(T)[:,None]
Q2  =  I

while np.any(dif > tol) and loop < 200:
    J   =  beta * Q2
    B   =  inv(I - J)

    Q3  =  np.zeros([T,T])
    ini =  np.argmax(c + (B @ (J * c) @ E).flatten(),axis=1)
    Q3[np.arange(T),ini]  =  1


    gB  =  2 * B @ (J * c @ E) @ (beta * Q2 * c @ E + B @ (np.linalg.matrix_power(I - J, 2) * c @ E)).T / beta_square
    B   += 0.1 * gB

    dif  =  np.max(np.absolute(Q3 - Q2))
    kcQ  =  k[ini]

    Q2   =  Q3
    loop += 1
Run Code Online (Sandbox Code Playgroud)

基本上,它遵循随机梯度下降算法,矩阵B由 初始化并由B = inv(I - J)演化B += 0.1 * gBJ随 变化Q2,并且Q2需要在每次迭代中确定。然而Q2,它是一个稀疏矩阵,每一列只有一个 1,其余为零,在代码中它是这样的:

ini =  np.argmax(c + (B @ (J * c) @ E).flatten(),axis=1)
Q3[np.arange(T),ini]  =  1
...
Q2  =  Q3
Run Code Online (Sandbox Code Playgroud)

该代码当前演示了 1000 x 1000 矩阵运算,是否可以对其进行优化并运行得更胖?

Jér*_*ard 4

以下是一些改进:

\n
    \n
  • beta * Q2计算了两次,J可以在第二次使用。
  • \n
  • J * c虽然可以执行一次,但也可以多次计算。对于 也一样I - J
  • \n
  • B @ (J * c) @ EB @ (J * c @ E)在数学上是等价的,但后者在您的情况下更快,并且也可以计算一次。
  • \n
  • CPython(几乎)不优化任何内容,而 Numpy 急切地执行运算,因此0.1 * (2 * Matrix / beta_square)实际上计算一个新矩阵M2 = 2 * Matrix,然后是一个新矩阵M3 = M2 / beta_square,然后是另一个矩阵M4 = 0.1 * M4。创建许多像这样的临时矩阵是昂贵的,因为它是内存绑定操作,并且现代机器上的内存带宽非常有限(与计算能力相比),更不用说填充新的临时数组通常比已填充的数组慢(因为虚拟内存,更具体地说是页面错误)。因此,它的速度更快(0.1 * 2 / beta_square) * Matrix(因为乘以浮点数比乘以大矩阵要快得多)。
  • \n
  • 一些基本操作(例如np.argmax(c + tmp3.flatten(), axis=1)或 )np.max(np.absolute(Q3 - Q2))可以使用 Numba 轻松加速。事实上,大多数
  • \n
  • 就地操作通常比异地操作更快(同样是因为昂贵的临时数组)。您可以通过out基本函数的参数(例如np.multiply(A, B, out=C))来使用它们。话虽如此,这里的好处很小,因为inv需要很长时间。
  • \n
  • 假设B循环末尾不需要,您可以使用np.linalg.solveHomer512 提到的替代。O(n**3)对于大型矩阵(相对于) ,求解系统的速度要快得多O(n**2),而且通常更准确。请参阅Don\xe2\x80\x99t 反转该矩阵。例如,inv(I-J) @ b可以替换为solve(I-J, b). solve尽管在您的特定用例中,由于稀疏矩阵,使用的好处并不是那么大I-J
  • \n
  • 如果B实际使用的是,在循环的末尾,那么这有点复杂。Numba 可以帮助编写一个相对快速的矩阵求逆,专门针对稀疏矩阵,就像在您的用例中一样(因为 Scipy 的矩阵求逆非常慢)。
  • \n
  • np.linalg.matrix_power(tmp0, 2) * c还可以在 Numba 中针对稀疏矩阵进行优化。
  • \n
\n

这是使用 Numba 的(几乎没有测试过的)实现:

\n
@nb.njit(\'(float64[:,::1], float64[::1])\', parallel=True)\ndef compute_ini(a, b):\n    n, m = a.shape\n    assert b.size == m and m > 0\n    res = np.empty(n, np.int64)\n    for i in nb.prange(n):\n        max_val, max_pos = a[i, 0] + b[0], 0\n        for j in range(1, m):\n            val = a[i, j] + b[j]\n            if val > max_val:\n                max_val = val\n                max_pos = j\n        res[i] = max_pos\n    return res\n\n@nb.njit(\'(float64[:,::1], float64[:,::1])\', parallel=True)\ndef max_abs_diff(a, b):\n    return np.max(np.absolute(a - b))\n\n# Utility function for invert_sparse_matrix\n@nb.njit\ndef invert_sparse_matrix_subkernel(b, out, i1, n, eps):\n    for i2 in range(n):\n        if i2 != i1:\n            scale = b[i2, i1]\n            if abs(scale) >= eps:\n                for j in range(n):\n                    b[i2, j] -= scale * b[i1, j]\n                    out[i2, j] -= scale * out[i1, j]\n\n@nb.njit(\'(float64[:,::1], float64[:,::1])\', parallel=True)\ndef invert_sparse_matrix(a, out):\n    eps = 1e-14\n    n, m = a.shape\n    assert n == m and out.shape == (n,n)\n\n    b = np.empty((n, n))\n\n    for i in nb.prange(n):\n        out[i, :i] = 0.0\n        out[i, i] = 1.0\n        out[i, i+1:] = 0.0\n        b[i, :] = a[i, :]\n\n    for i1 in range(n):\n        scale = 1.0 / b[i1, i1]\n        if abs(scale) < eps:\n            b[i1, :].fill(0.0)\n            out[i1, :].fill(0.0)\n            invert_sparse_matrix_subkernel(b, out, i1, n, eps)\n        elif abs(scale-1.0) < eps:\n            invert_sparse_matrix_subkernel(b, out, i1, n, eps)\n        else:\n            b[i1, :] *= scale\n            out[i1, :] *= scale\n            invert_sparse_matrix_subkernel(b, out, i1, n, eps)\n\n@nb.njit(\'(float64[:,::1], float64[:,::1], float64[:,::1])\', parallel=True)\ndef sparse_square_premult(a, premult, out):\n    eps = 1e-14\n    n, m = a.shape\n    assert n == m\n    assert premult.shape == (n, n)\n    assert out.shape == (n, n)\n\n    for i in nb.prange(n):\n        out[i, :] = 0.0\n        for j in range(n):\n            if abs(a[i, j]) >= eps:\n                for k in range(n):\n                    out[i, k] += a[i, j] * a[j, k]\n        out[i, :] *= premult[i, :]\n\ndef compute_numba():\n    # parameters\n    beta     =  0.98 \n    alpha    =  0.03\n    delta    =  0.1\n    T        =  1000\n    loop     =  1\n    dif      =  1\n    tol      =  1e-8\n\n    kss      =  ((1 / beta - (1 - delta)) / alpha)**(1 / (alpha - 1))\n    k        =  np.linspace(0.5 * kss, 1.8 * kss, T)\n\n    k_reshaped   =  k.reshape(-1, 1)\n    c            =  k_reshaped ** alpha + (1 - delta) * k_reshaped - k\n    c[c<0]       =  1e-11\n    c            =  np.log(c)\n    beta_square  =  beta**2\n\n    # multiplication\n    I   =  np.identity(T)\n    E   =  np.ones(T)[:,None]\n    Q2  =  I\n\n    J = np.empty((T, T))\n    tmp0 = np.empty((T, T))\n    tmp1 = np.empty((T, T))\n    tmp2 = np.empty((T, 1))\n    tmp3 = np.empty((T, 1))\n    tmp4 = np.empty((T, T))\n    B = np.empty((T, T))\n\n    while np.any(dif > tol) and loop < 200:\n        np.multiply(beta, Q2, out=J)\n        np.subtract(I, J, out=tmp0)\n        invert_sparse_matrix(tmp0, B)\n\n        Q3 = np.zeros((T,T))\n        np.multiply(J, c, out=tmp1)\n        np.matmul(tmp1, E, out=tmp2)\n        np.matmul(B, tmp2, out=tmp3)\n        ini = compute_ini(c, tmp3.flatten())\n        Q3[np.arange(T), ini] = 1\n\n        factor = 0.1 * 2 / beta_square\n        sparse_square_premult(tmp0, c, tmp4)\n        np.add(B, (factor * tmp3) @ (tmp2 + B @ (tmp4 @ E)).T, out=B)\n\n        dif = max_abs_diff(Q3, Q2)\n        kcQ = k[ini]\n\n        Q2 = Q3\n        loop += 1\n\ncompute_numba()\n
Run Code Online (Sandbox Code Playgroud)\n

请注意,该invert_sparse_matrix函数仍然需要大量时间(在我的机器上>50%),尽管它invsolve. 它是对朴素求逆算法的改进,对非常稀疏的矩阵几乎没有优化。它当然可以进一步优化(例如使用平铺),但这肯定不是一件容易的事(特别是对于新手程序员来说)。

\n

请注意,编译时间需要几秒钟。

\n

总体而言,此实现比我的 i5-9600KF 处理器(6 核)上的初始实现快约 4~5 倍。

\n