numpy 在另一个数组中创建最大连续对的数组

Gal*_*zky 15 python numpy max-pooling

我有一个 numpy 数组:

A = np.array([8, 2, 33, 4, 3, 6])
Run Code Online (Sandbox Code Playgroud)

我想要的是创建另一个数组 B,其中每个元素都是 A 中 2 个连续对的成对最大值,所以我得到:

B = np.array([8, 33, 33, 4, 6])
Run Code Online (Sandbox Code Playgroud)

关于如何实施的任何想法?
关于如何为超过 2 个元素实现这一点的任何想法?(同样的事情,但对于连续的 n 个元素)

编辑:

答案给了我解决这个问题的方法,但是对于 n 大小的窗口情况,是否有更有效的方法不需要循环?

编辑2:

事实证明,该问题等同于询问如何对具有大小为 n 的窗口的列表执行 1d 最大池化。有谁知道如何有效地实现这一点?

Gal*_*zky 9

成对问题的一种解决方案是使用np.maximum函数和数组切片:

B = np.maximum(A[:-1], A[1:])
Run Code Online (Sandbox Code Playgroud)


Nic*_*ais 7

无循环解决方案是max在由skimage.util.view_as_windows以下人员创建的窗口上使用:

list(map(max, view_as_windows(A, (2,))))
Run Code Online (Sandbox Code Playgroud)
[8, 33, 33, 4, 6]
Run Code Online (Sandbox Code Playgroud)

复制/粘贴示例:

import numpy as np
from skimage.util import view_as_windows

A = np.array([8, 2, 33, 4, 3, 6])

list(map(max, view_as_windows(A, (2,))))
Run Code Online (Sandbox Code Playgroud)


Div*_*kar 5

在这个问答中,我们基本上要求滑动最大值。之前已经探索过-Max 在 NumPy 数组中的滑动窗口中。因为,我们希望提高效率,所以我们可以看得更远。其中之一是numba,这里有两个最终变体,我最终使用了杠杆parallel指令,该指令比没有版本提高了性能:

import numpy as np
from numba import njit, prange

@njit(parallel=True)
def numba1(a, W):
    L = len(a)-W+1
    out = np.empty(L, dtype=a.dtype)
    v = np.iinfo(a.dtype).min
    for i in prange(L):
        max1 = v
        for j in range(W):
            cur = a[i + j]
            if cur>max1:
                max1 = cur                
        out[i] = max1
    return out 

@njit(parallel=True)
def numba2(a, W):
    L = len(a)-W+1
    out = np.empty(L, dtype=a.dtype)
    for i in prange(L):
        for j in range(W):
            cur = a[i + j]
            if cur>out[i]:
                out[i] = cur                
    return out 
Run Code Online (Sandbox Code Playgroud)

从较早的链接问答中,等效的 SciPy 版本将是 -

from scipy.ndimage.filters import maximum_filter1d

def scipy_max_filter1d(a, W):
    L = len(a)-W+1
    hW = W//2 # Half window size
    return maximum_filter1d(a,size=W)[hW:hW+L]
Run Code Online (Sandbox Code Playgroud)

基准测试

其他发布的通用窗口 arg 工作方法:

from skimage.util import view_as_windows

def rolling(a, window):
    shape = (a.size - window + 1, window)
    strides = (a.itemsize, a.itemsize)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

# @mathfux's soln
def npmax_strided(a,n):
    return np.max(rolling(a, n), axis=1)

# @Nicolas Gervais's soln
def mapmax_strided(a, W):
    return list(map(max, view_as_windows(a,W)))

cummax = np.maximum.accumulate
def pp(a,w):
    N = a.size//w
    if a.size-w+1 > N*w:
        out = np.empty(a.size-w+1,a.dtype)
        out[:-1] = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-1:-1]
        out[-1] = a[w*N:].max()
    else:
        out = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-2:-1]
    out[1:N*w-w+1] = np.maximum(out[1:N*w-w+1],
                            cummax(a[w:w*N].reshape(N-1,w),axis=1).ravel())
    out[N*w-w+1:] = np.maximum(out[N*w-w+1:],cummax(a[N*w:]))
    return out
Run Code Online (Sandbox Code Playgroud)

使用benchit包(很少有基准测试工具打包在一起;免责声明:我是它的作者)对提议的解决方案进行基准测试。

import benchit
funcs = [mapmax_strided, npmax_strided, numba1, numba2, scipy_max_filter1d, pp]
in_ = {(n,W):(np.random.randint(0,100,n),W) for n in 10**np.arange(2,6) for W in [2, 10, 20, 50, 100]}
t = benchit.timings(funcs, in_, multivar=True, input_name=['Array-length', 'Window-length'])
t.plot(logx=True, sp_ncols=1, save='timings.png')
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

因此,numba 非常适合小于 的窗口大小10,在这种情况下,没有明显的赢家,而在较大的窗口大小上pp,SciPy 会胜出,排在第二位。


Pau*_*zer 5

这是一种专门为较大窗户量身定制的方法。窗口大小为 O(1),数据大小为 O(n)。

我已经完成了一个纯 numpy 和一个 pythran 实现。

我们如何在窗口大小上实现 O(1)?我们使用“锯齿”技巧:如果 w 是窗口宽度,我们将数据分组为多个 w,对于每组,我们从左到右和从右到左进行累积最大值。任何中间窗口的元素分布在两个组上,交叉点的最大值在我们之前计算的累积最大值中。所以我们每个数据点总共需要 3 次比较。

benchit(感谢@Divakar) w=100;我的函数是 pp (numpy) 和 winmax (pythran):

在此处输入图片说明

对于小窗口尺寸 w=5,图片更均匀。有趣的是,即使对于非常小的尺寸,pythran 仍然具有巨大的优势。他们必须做一些正确的事情来最小化呼叫开销。

在此处输入图片说明

蟒蛇代码:

cummax = np.maximum.accumulate
def pp(a,w):
    N = a.size//w
    if a.size-w+1 > N*w:
        out = np.empty(a.size-w+1,a.dtype)
        out[:-1] = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-1:-1]
        out[-1] = a[w*N:].max()
    else:
        out = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-2:-1]
    out[1:N*w-w+1] = np.maximum(out[1:N*w-w+1],
                            cummax(a[w:w*N].reshape(N-1,w),axis=1).ravel())
    out[N*w-w+1:] = np.maximum(out[N*w-w+1:],cummax(a[N*w:]))
    return out
Run Code Online (Sandbox Code Playgroud)

pythran 版本;编译pythran -O3 <filename.py>; 这将创建一个编译模块,您可以导入它:

import numpy as np

# pythran export winmax(float[:],int)
# pythran export winmax(int[:],int)

def winmax(data,winsz):
    N = data.size//winsz
    if N < 1:
        raise ValueError
    out = np.empty(data.size-winsz+1,data.dtype)
    nxt = winsz
    for j in range(winsz,data.size):
        if j == nxt:
            nxt += winsz
            out[j+1-winsz] = data[j]
        else:
            out[j+1-winsz] = out[j-winsz] if out[j-winsz]>data[j] else data[j]
    running = data[-winsz:N*winsz].max()
    nxt -= winsz << (nxt > data.size)
    for j in range(data.size-winsz,0,-1):
        if j == nxt:
            nxt -= winsz
            running = data[j-1]
        else:
            running = data[j] if data[j] > running else running
            out[j] = out[j] if out[j] > running else running
    out[0] = data[0] if data[0] > running else running
    return out
Run Code Online (Sandbox Code Playgroud)