使用步幅进行有效的移动平均滤波器

Ben*_*min 29 python numpy image-processing filter

我最近在这篇文章答案中学到了大步,并且想知道如何使用它们来比我在本文中提出的更有效地计算移动平均滤波器(使用卷积滤波器).

这就是我到目前为止所拥有的.它接受原始数组的视图然后将其滚动必要的量并将内核值相加以计算平均值.我知道边缘没有正确处理,但我可以在以后处理...有更好更快的方法吗?目标是过滤大到5000x5000 x 16层的大型浮点阵列,这个任务scipy.ndimage.filters.convolve相当慢.

请注意,我正在寻找8邻居连接,即3x3滤镜取9个像素的平均值(焦点像素周围8个),并将该值分配给新图像中的像素.

import numpy, scipy

filtsize = 3
a = numpy.arange(100).reshape((10,10))
b = numpy.lib.stride_tricks.as_strided(a, shape=(a.size,filtsize), strides=(a.itemsize, a.itemsize))
for i in range(0, filtsize-1):
    if i > 0:
        b += numpy.roll(b, -(pow(filtsize,2)+1)*i, 0)
filtered = (numpy.sum(b, 1) / pow(filtsize,2)).reshape((a.shape[0],a.shape[1]))
scipy.misc.imsave("average.jpg", filtered)
Run Code Online (Sandbox Code Playgroud)

编辑关于我如何看待这个工作的澄清:

当前代码:

  1. 使用stride_tricks生成一个类似[[0,1,2],[1,2,3],[2,3,4] ...]的数组,它对应于过滤器内核的顶行.
  2. 沿垂直轴滚动以获得内核[[10,11,12],[11,12,13],[13,14,15] ...]的中间行并将其添加到我得到的数组中1)
  3. 重复以获得内核的最后一行[[20,21,22],[21,22,23],[22,23,24] ...].此时,我取每行的总和除以滤波器中的元素数量,给出每个像素的平均值,(移动1行和1列,边缘有一些奇怪,但我可以稍后再照顾).

我希望的是更好地使用stride_tricks直接获取9个值或内核元素的总和,对于整个数组,或者有人可以说服我另一个更有效的方法......

Joe*_*ton 28

对于它的价值,这里是你如何使用"花哨的"跨步技巧来做到这一点.我昨天要发布这个帖子,但实际工作让我分心了!:)

@Paul和@eat都有很好的实现,使用其他各种方法.只是为了继续前面的问题,我想我会发布N维等价物.

但是,你无法显着击败scipy.ndimage> 1D阵列的功能.(scipy.ndimage.uniform_filter应该打败scipy.ndimage.convolve)

此外,如果您试图获得一个多维移动窗口,那么每当您无意中制作阵列副本时,就有可能导致内存使用量爆炸.虽然最初的"滚动"数组只是原始数组内存的一个视图,但复制数组的任何中间步骤都会产生比原始数组大几个数量级的副本(即假设你正在使用它一个100x100的原始数组...进入它的视图(对于(3,3)的过滤器大小)将是98x98x3x3但使用与原始相同的内存.但是,任何副本将使用完整 98x98x3x3阵列的内存量将!!)

基本上,当您想要在ndarray 的单个轴上矢量化移动窗口操作时,使用疯狂的跨步技巧非常有用.它可以很容易地计算诸如移动标准偏差之类的东西,而且开销很小.当你想沿着多个轴开始这样做时,它是可能的,但你通常会有更专业的功能.(如scipy.ndimage等)

无论如何,这是你如何做到的:

import numpy as np

def rolling_window_lastaxis(a, window):
    """Directly taken from Erik Rigtorp's post to numpy-discussion.
    <http://www.mail-archive.com/numpy-discussion@scipy.org/msg29450.html>"""
    if window < 1:
       raise ValueError, "`window` must be at least 1."
    if window > a.shape[-1]:
       raise ValueError, "`window` is too long."
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

def rolling_window(a, window):
    if not hasattr(window, '__iter__'):
        return rolling_window_lastaxis(a, window)
    for i, win in enumerate(window):
        if win > 1:
            a = a.swapaxes(i, -1)
            a = rolling_window_lastaxis(a, win)
            a = a.swapaxes(-2, i)
    return a

filtsize = (3, 3)
a = np.zeros((10,10), dtype=np.float)
a[5:7,5] = 1

b = rolling_window(a, filtsize)
blurred = b.mean(axis=-1).mean(axis=-1)
Run Code Online (Sandbox Code Playgroud)

所以我们得到的b = rolling_window(a, filtsize)是一个8x8x3x3阵列,它实际上是与原始10x10阵列相同的内存视图.我们可以像沿着不同的轴一样容易地使用不同的滤波器尺寸,或者仅沿着N维阵列的选定轴操作(即filtsize = (0,3,0,3),在4维阵列上将给出6维视图).

然后,我们可以重复地将任意函数应用于最后一个轴,以有效地计算移动窗口中的事物.

但是,因为我们在每个步骤mean(std或者其他)上存储比我们原始数组大得多的临时数组,所以这根本不是内存效率!它也不会非常快.

相当于ndimage:

blurred = scipy.ndimage.uniform_filter(a, filtsize, output=a)
Run Code Online (Sandbox Code Playgroud)

这将处理各种边界条件,就地"模糊"而不需要数组的临时副本,并且速度非常快.跨步技巧是将功能应用于沿一个轴移动窗口的好方法,但它们不是沿多个轴执行它的好方法,通常....

只要我0.02美元,无论如何......

  • 非常好的说法:"跨步技巧是将功能应用于沿一个轴移动的窗口的好方法,但它们不是沿多个轴执行它的好方法,通常......".当然,你对记忆'爆炸'的解释很重要.从你的回答(至少对我来说)的总结是:'不要走太远的钓鱼,quarenteed捕获已经scipy'.谢谢 (3认同)
  • 是否可以指定步长? (2认同)

Jon*_*nas 8

我不熟悉Python为此编写代码,但加速卷积的两种最佳方法是分离滤波器或使用傅里叶变换.

分离滤波器:卷积为O(M*N),其中M和N分别是图像和滤波器中的像素数.由于使用3乘3内核的平均过滤相当于先使用3乘1内核进行过滤,然后使用1乘3内核进行过滤,因此(3+3)/(3*3)通过连续卷积使用2到1 可以获得~~ 30%的速度提升d内核(随着内核变大,这显然会变得更好).当然,你仍然可以在这里使用大步技巧.

傅里叶变换:conv(A,B)相当于ifft(fft(A)*fft(B)),即直接空间中的卷积成为傅立叶空间中的乘法,其中A是您的图像并且B是您的滤波器.由于傅里叶变换的(逐元素)乘法要求A和B的大小相同,B是一个数组,size(A)其中你的内核位于图像的正中心,而其他地方都是零.要在数组的中心放置一个3乘3的内核,您可能需要填充A到奇数大小.根据您的傅里叶变换的实现,这可能比卷积快得多(如果您多次应用相同的滤波器,您可以预先计算fft(B),节省另外30%的计算时间).

  • 对于它的价值,在python中,它们分别在`scipy.ndimage.uniform_filter`和`scipy.signal.fftconvolve`中实现. (4认同)

eat*_*eat 5

让我们来看看:

你的问题不是很清楚,但我现在假设你会想显着提高这种平均水平。

import numpy as np
from numpy.lib import stride_tricks as st

def mf(A, k_shape= (3, 3)):
    m= A.shape[0]- 2
    n= A.shape[1]- 2
    strides= A.strides+ A.strides
    new_shape= (m, n, k_shape[0], k_shape[1])
    A= st.as_strided(A, shape= new_shape, strides= strides)
    return np.sum(np.sum(A, -1), -1)/ np.prod(k_shape)

if __name__ == '__main__':
    A= np.arange(100).reshape((10, 10))
    print mf(A)
Run Code Online (Sandbox Code Playgroud)

现在,您实际上期望什么样的性能改进?

更新:
首先,警告:当前状态的代码无法正确适应“内核”形状。然而,这不是我现在主要关心的问题(无论如何,如何正确适应的想法已经存在)。

我刚刚直观地选择了 4D A 的新形状,对我来说,考虑以原始 2D A 的每个网格位置为中心的 2D“内核”中心真的很有意义。

但这种 4D 造型实际上可能不是“最好的”。我认为这里真正的问题是求和的性能。为了充分利用您的机器缓存架构,您应该能够找到(4D A 的)“最佳顺序”。但是,对于与您的机器缓存“合作”的“小型”阵列和那些与您的机器缓存“合作”的大型阵列,该顺序可能不同(至少不是那么简单的方式)。

更新 2:
这是mf. 显然,最好先重塑为 3D 数组,然后再进行求和而不是求和,而只是进行点积(这具有所有优点,该内核可以是任意的)。然而,它仍然比 Pauls 更新的函数慢 3 倍(在我的机器上)。

def mf(A):
    k_shape= (3, 3)
    k= np.prod(k_shape)
    m= A.shape[0]- 2
    n= A.shape[1]- 2
    strides= A.strides* 2
    new_shape= (m, n)+ k_shape
    A= st.as_strided(A, shape= new_shape, strides= strides)
    w= np.ones(k)/ k
    return np.dot(A.reshape((m, n, -1)), w)
Run Code Online (Sandbox Code Playgroud)