我有一个二维数字数组,想对每一行中的不同索引求平均值。说我有
import numpy as np
data = np.arange(16).reshape(4, 4)
Run Code Online (Sandbox Code Playgroud)
[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]
[12 13 14 15]]
Run Code Online (Sandbox Code Playgroud)
我有两个列表,为每一行指定第一个(包含)和最后一个(不包含)索引:
start = [1, 0, 1, 2]
end = [2, 1, 3, 4]
Run Code Online (Sandbox Code Playgroud)
然后我想实现这一点:
result = []
for i in range(4):
result.append(np.sum(data[i, start[i]:end[i]]))
Run Code Online (Sandbox Code Playgroud)
这使
[1, 4, 19, 29]
Run Code Online (Sandbox Code Playgroud)
但是,我使用的数组比这个例子中的要大很多,所以这个方法对我来说太慢了。有什么聪明的方法可以避免这个循环吗?
我的第一个想法是展平阵列。然后,我想,人们需要以某种方式制作一个切片列表并将其并行应用于数组,我不知道该怎么做。
否则,我正在考虑使用np.apply_along_axis但我认为这仅适用于函数?
让我们带着你的疯狂想法奔跑吧。您可以将数组的索引转换为散乱的索引,如下所示:
ind = np.stack((start, end), axis=0)
ind += np.arange(data.shape[0]) * data.shape[1]
ind = ind.ravel(order='F')
if ind[-1] == data.size:
ind = ind[:-1]
Run Code Online (Sandbox Code Playgroud)
现在你可以解开原始数组,并add.reduceat在这样定义的段上:
np.add.reduceat(data.ravel(), ind)[::2] / np.subtract(end, start)
Run Code Online (Sandbox Code Playgroud)
TL; 博士
def row_mean(data, start, end):
ind = np.stack((start, end), axis=0)
ind += np.arange(data.shape[0]) * data.shape[1]
ind = ind.ravel(order='F')
if ind[-1] == data.size:
ind = ind[:-1]
return np.add.reduceat(data.ravel(), ind)[::2] / np.subtract(end, start)
Run Code Online (Sandbox Code Playgroud)
时间安排
使用@Divakar's answer 中显示的完全相同的数组,我们得到以下结果(当然特定于我的机器):
%timeit einsum_mean(data, start, end)
261 ms ± 2.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit broadcasting_mean(data, start, end)
405 ms ± 1.64 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit ragged_mean(data, start, end)
520 ms ± 3.68 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit row_mean(data, start, end)
45.6 ms ± 708 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Run Code Online (Sandbox Code Playgroud)
有点令人惊讶的是,尽管通过将感兴趣区域之间的所有数字相加做了很多额外的工作,但这种方法的运行速度比所有其他方法快 5-10 倍。原因可能是它的开销极低:索引数组很小,并且它只对一维数组进行一次传递。