numpy:高效,大点产品

Dat*_*chy 4 python performance numpy

我正在尝试执行大型线性代数计算,以将通用协方差矩阵KK_l_obs(形状(NL, NL))转换为缩小空间Kmap_PC(形状(q, q, X, Y))中的协方差矩阵的映射.

有关如何Kmap_PC为每个空间位置构造的信息保存在其他数组aI0,和k_l_th.前两个有形状(X, Y),第三个有形状(nl, nl).观察到的和缩小的空间之间的转换由eingenvectors E(形状(q, nl))传递.请注意NL> nl.

空间元素Kmap_PC计算如下:

Kmap_PC[..., X, Y] = E.dot(
    KK_l_obs[I0[X, Y]: I0[X, Y] + nl,
             I0[X, Y]: I0[X, Y] + nl] / a_map[X, Y] + \
    k_l_th).dot(E.T)
Run Code Online (Sandbox Code Playgroud)

理论上,第一个点积内的位可以直接使用np.einsum,但会占用数百GB的内存.我现在正在做的是循环空间索引Kmap_PC,这很慢.我也可以使用MPI分配计算(这可能会提供3-4倍的加速,因为我有16个核心可用).

我在想:

(a)如果我能更有效地进行计算 - 或许明确地将其分解为空间元素组; 和

(b)如果我可以改善那些计算的内存开销.

代码段

import numpy as np
np.random.seed(1)

X = 10
Y = 10
NL = 3000
nl = 1000
q = 7

a_map = 5. * np.random.rand(X, Y)
E = np.random.randn(q, nl)

# construct constant component
m1_ = .05 * np.random.rand(nl, nl)
k_l_th = m1_.dot(m1_)

# construct variable component
m2_ = np.random.rand(NL, NL)
KK_l_obs = m2_.dot(m2_.T)

# where to start in big cov
I0 = np.random.randint(0, NL - nl, (X, Y))

# the slow way
def looping():
    K_PC = np.empty((q, q, X, Y))
    inds = np.ndindex((X, Y))

    for si in inds:
        I0_ = I0[si[0], si[1]]
        K_PC[..., si[0], si[1]] = E.dot(
            KK_l_obs[I0_ : I0_ + nl, I0_ : I0_ + nl] / a_map[si[0], si[1]] + k_l_th).dot(E.T)

    return K_PC

def veccalc():
    nl_ = np.arange(nl)[..., None, None]
    I, J = np.meshgrid(nl_, nl_)

    K_s = KK_l_obs[I0[..., None, None] + J, I0[..., None, None] + I]
    K_s = K_s / a_map[..., None, None] + k_l_th[None, None, ...]
    print(K_s.nbytes)

    K_PC = E @ K_s @ E.T
    K_PC = np.moveaxis(K_PC, [0, 1], [-2, -1])

    return K_PC
Run Code Online (Sandbox Code Playgroud)

Div*_*kar 5

调整#1

在NumPy中被忽略的一个非常简单的性能调整是避免使用除法和使用乘法.在处理相同形状的数组时,在处理标量到数组或数组到数组时,这是不明显的.但NumPy的隐式广播使得对于允许在不同形状的阵列之间或阵列和标量之间进行广播的分区感兴趣.对于这些情况,我们可以使用乘法与倒数来获得明显的提升.因此,对于所述问题,我们将预先计算其倒数a_map并将其用于乘法而不是除法.

所以,在开始时做:

r_a_map = 1.0/a_map
Run Code Online (Sandbox Code Playgroud)

然后,在嵌套循环中,将其用作:

KK_l_obs[I0_ : I0_ + nl, I0_ : I0_ + nl] * r_a_map[si[0], si[1]]
Run Code Online (Sandbox Code Playgroud)

调整#2

我们可以使用associative乘法的属性:

A*(B + C) = A*B + A*C
Run Code Online (Sandbox Code Playgroud)

因此,k_l_th这可以在所有迭代中求和,但保持不变可以在循环之外取出并在离开嵌套循环后求和.它的有效总结是:E.dot(k_l_th).dot(E.T).所以,我们会添加这个K_PC.


完成和基准测试

使用调整#1和调整#2,我们最终会采用修改后的方法,如此 -

def original_mod_app():
    r_a_map = 1.0/a_map
    K_PC = np.empty((q, q, X, Y))
    inds = np.ndindex((X, Y))
    for si in inds:
        I0_ = I0[si[0], si[1]]
        K_PC[..., si[0], si[1]] = E.dot(
            KK_l_obs[I0_ : I0_ + nl, I0_ : I0_ + nl] * \
            r_a_map[si[0], si[1]]).dot(E.T)
    return K_PC + E.dot(k_l_th).dot(E.T)[:,:,None,None]
Run Code Online (Sandbox Code Playgroud)

运行时测试使用与问题中使用的相同的样本设置 -

In [458]: %timeit original_app()
1 loops, best of 3: 1.4 s per loop

In [459]: %timeit original_mod_app()
1 loops, best of 3: 677 ms per loop

In [460]: np.allclose(original_app(), original_mod_app())
Out[460]: True
Run Code Online (Sandbox Code Playgroud)

所以,我们正在加快速度2x+.