Int*_*ral 1 performance numpy linear-algebra matrix-multiplication
我的问题如下,我有一个迭代算法,这样在每次迭代时它需要执行几个矩阵乘法点(A_i,B_i),对于 i = 1 ... k。由于这些乘法是用 Numpy 的点执行的,我知道他们正在调用 BLAS-3 实现,这非常快。问题是调用数量巨大,结果证明是我程序中的瓶颈。我想通过制作更少的产品但使用更大的矩阵来最小化所有这些调用的开销。
为简单起见,考虑所有矩阵都是 nxn(通常 n 不大,范围在 1 到 1000 之间)。解决我的问题的一种方法是考虑块对角矩阵 diag( A_i ) 并执行下面的乘积。
这只是对函数 dot 的一次调用,但现在程序浪费了很多时间执行与零的乘法。这个想法似乎不起作用,但它给出了结果 [ A_1 B_1 , ..., A_k B_k ],即所有产品堆叠在一个大矩阵中。
我的问题是,有没有办法通过单个函数调用计算 [ A_1 B_1 , ..., A_k B_k ] ?或者更重要的是,如何比制作 Numpy 点循环更快地计算这些产品?
编辑
对于较大的 nxn 矩阵(大约 20 大小),来自编译代码的 BLAS 调用更快,对于较小的矩阵,自定义 Numba 或 Cython 内核通常更快。
以下方法为给定的输入形状生成自定义点函数。通过这种方法,还可以从与编译器相关的优化中受益,例如循环展开,这对于小矩阵尤其重要。
必须注意的是,生成和编译一个内核大约需要。1s,因此请确保仅在确实需要时才调用生成器。
发电机功能
def gen_dot_nm(x,y,z):
#small kernels
@nb.njit(fastmath=True,parallel=True)
def dot_numba(A,B):
"""
calculate dot product for (x,y)x(y,z)
"""
assert A.shape[0]==B.shape[0]
assert A.shape[2]==B.shape[1]
assert A.shape[1]==x
assert B.shape[1]==y
assert B.shape[2]==z
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
for i in range(x):
for j in range(z):
acc=0.
for k in range(y):
acc+=A[ii,i,k]*B[ii,k,j]
res[ii,i,j]=acc
return res
#large kernels
@nb.njit(fastmath=True,parallel=True)
def dot_BLAS(A,B):
assert A.shape[0]==B.shape[0]
assert A.shape[2]==B.shape[1]
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
res[ii]=np.dot(A[ii],B[ii])
return res
#At square matices above size 20
#calling BLAS is faster
if x>=20 or y>=20 or z>=20:
return dot_BLAS
else:
return dot_numba
Run Code Online (Sandbox Code Playgroud)
使用示例
A=np.random.rand(1000,2,2)
B=np.random.rand(1000,2,2)
dot22=gen_dot_nm(2,2,2)
X=dot22(A,B)
%timeit X3=dot22(A,B)
#5.94 µs ± 21.3 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
Run Code Online (Sandbox Code Playgroud)
旧答案
另一种选择,但需要做更多的工作,是使用一些特殊的 BLAS 实现,它可以及时为非常小的矩阵创建自定义内核,而不是从 C 调用这个内核。
例子
import numpy as np
import numba as nb
#Don't use this for larger submatrices
@nb.njit(fastmath=True,parallel=True)
def dot(A,B):
assert A.shape[0]==B.shape[0]
assert A.shape[2]==B.shape[1]
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
for i in range(A.shape[1]):
for j in range(B.shape[2]):
acc=0.
for k in range(B.shape[1]):
acc+=A[ii,i,k]*B[ii,k,j]
res[ii,i,j]=acc
return res
@nb.njit(fastmath=True,parallel=True)
def dot_22(A,B):
assert A.shape[0]==B.shape[0]
assert A.shape[1]==2
assert A.shape[2]==2
assert B.shape[1]==2
assert B.shape[2]==2
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
res[ii,0,0]=A[ii,0,0]*B[ii,0,0]+A[ii,0,1]*B[ii,1,0]
res[ii,0,1]=A[ii,0,0]*B[ii,0,1]+A[ii,0,1]*B[ii,1,1]
res[ii,1,0]=A[ii,1,0]*B[ii,0,0]+A[ii,1,1]*B[ii,1,0]
res[ii,1,1]=A[ii,1,0]*B[ii,0,1]+A[ii,1,1]*B[ii,1,1]
return res
Run Code Online (Sandbox Code Playgroud)
时间安排
A=np.random.rand(1000,2,2)
B=np.random.rand(1000,2,2)
X=A@B
X2=np.einsum("xik,xkj->xij",A,B)
X3=dot_22(A,B) #avoid measurig compilation overhead
X4=dot(A,B) #avoid measurig compilation overhead
%timeit X=A@B
#262 µs ± 2.55 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit np.einsum("xik,xkj->xij",A,B,optimize=True)
#264 µs ± 3.22 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit X3=dot_22(A,B)
#5.68 µs ± 27.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
%timeit X4=dot(A,B)
#9.79 µs ± 61.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
Run Code Online (Sandbox Code Playgroud)