移动平均线或平均线

She*_*284 164 python numpy matplotlib scipy python-2.7

是否有一个scipy函数或numpy函数或模块用于python,在给定特定窗口的情况下计算一维数组的运行平均值?

lap*_*pis 214

UPD:Alleojasaarim提出了更有效的解决方案.


你可以用np.convolve它:

np.convolve(x, np.ones((N,))/N, mode='valid')
Run Code Online (Sandbox Code Playgroud)

说明

运行平均值是卷积数学运算的一个例子.对于运行平均值,您沿着输入滑动窗口并计算窗口内容的平均值.对于离散的1D信号,卷积是相同的,除了代替计算任意线性组合的平均值,即将每个元素乘以相应的系数并将结果相加.那些系数,一个用于窗口中的每个位置,有时称为卷积.现在,N值的算术平均值是(x_1 + x_2 + ... + x_N) / N,所以相应的内核是(1/N, 1/N, ..., 1/N),这正是我们通过使用得到的np.ones((N,))/N.

边缘

mode的参数np.convolve指定如何处理边缘.我在valid这里选择了这种模式,因为我认为这是大多数人对运行方式的期望,但你可能还有其他优先事项.这是一个说明模式之间差异的图:

import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
    plt.plot(np.convolve(np.ones((200,)), np.ones((50,))/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()
Run Code Online (Sandbox Code Playgroud)

运行平均卷积模式

  • 我喜欢这个解决方案,因为它是干净的(一行)和_relatively_高效(在numpy内完成的工作).但使用`numpy.cumsum`的Alleo的"高效解决方案"具有更好的复杂性. (4认同)
  • @denfromufa,我相信文档很好地涵盖了实现,它还链接到维基百科,它解释了数学.考虑到问题的重点,您认为这个答案需要复制吗? (2认同)

All*_*leo 135

高效的解决方案

卷积比直接的方法好得多,但(我猜)它使用FFT,因此非常慢.但是,特别是为计算运行平均值,以下方法工作正常

def running_mean(x, N):
    cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)
Run Code Online (Sandbox Code Playgroud)

要检查的代码

In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop
Run Code Online (Sandbox Code Playgroud)

注意numpy.allclose(result1, result2)True,这两种方法是等效的.N越大,时间差异越大.

  • 很好的解决方案,但请注意,它可能会遇到大型数组的数字错误,因为在数组末尾,您可能会减去两个大数字以获得较小的结果. (6认同)
  • 如果x包含浮点数,则此解决方案的数值稳定性可能会成为问题。例如:`running_mean(np.arange(int(1e7))[::-1] + 0.2,1)[-1]-0.2`返回`0.003125`,而人们期望的是`0.0`。更多信息:https://en.wikipedia.org/wiki/Loss_of_significance (4认同)
  • 好的解决方案 我的预感是`numpy.convolve`是O(mn); 它的[docs](http://docs.scipy.org/doc/numpy/reference/generated/numpy.convolve.html)提到`scipy.signal.fftconvolve`使用FFT. (3认同)
  • 这个方法不处理数组的边缘,是吗? (3认同)
  • 这使用整数除法而不是浮点除法:`running_mean([1,2,3], 2)` 给出 `array([1, 2])`。用 `[float(value) for value in x]` 替换 `x` 可以解决问题。 (2认同)

jas*_*rim 70

更新:下面的示例显示了pandas.rolling_mean在最近版本的pandas中删除的旧功能.下面的函数调用的现代等价物将是

In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]: 
array([ 0.49815397,  0.49844183,  0.49840518, ...,  0.49488191,
        0.49456679,  0.49427121])
Run Code Online (Sandbox Code Playgroud)

熊猫比NumPy或SciPy更适合这种情况.它的功能rolling_mean可以方便地完成工作.当输入是数组时,它还返回NumPy数组.

rolling_mean使用任何自定义纯Python实现很难超越性能.以下是针对两个建议解决方案的示例性能:

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: def running_mean(x, N):
   ...:     cumsum = np.cumsum(np.insert(x, 0, 0)) 
   ...:     return (cumsum[N:] - cumsum[:-N]) / N
   ...:

In [4]: x = np.random.random(100000)

In [5]: N = 1000

In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop

In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop

In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop

In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True
Run Code Online (Sandbox Code Playgroud)

关于如何处理边缘值也有很好的选择.

  • Pandas rolling_mean是一个很好的工具,但已被弃用ndarrays.在未来的Pandas版本中,它只能在Pandas系列上运行.我们现在在哪里转向非Pandas阵列数据? (6认同)
  • [%timeit bottleneck.move_mean(x,N)`](https://github.com/kwgoodman/bottleneck)比我电脑上的cumsum和pandas方法快3到15倍.在repo的[自述文件](https://github.com/kwgoodman/bottleneck/blob/master/README.rst)中查看他们的基准. (6认同)
  • @Mike rolling_mean()已被弃用,但现在你可以单独使用rolling和mean:`df.rolling(windowsize).mean()`现在可以正常工作(很快我可能会添加).6,000行系列`%timeit test1.rolling(20).mean()`返回_1000循环,最佳3:1.16 ms每循环_ (4认同)
  • @Vlox`df.rolling()`运行良好,问题是即使这种形式将来也不支持ndarrays.要使用它,我们必须首先将数据加载到Pandas Dataframe中.我希望看到这个函数添加到`numpy`或`scipy.signal`. (4认同)

mtr*_*trw 49

您可以使用以下公式计算运行平均值

import numpy as np

def runningMean(x, N):
    y = np.zeros((len(x),))
    for ctr in range(len(x)):
         y[ctr] = np.sum(x[ctr:(ctr+N)])
    return y/N
Run Code Online (Sandbox Code Playgroud)

但它很慢.

幸运的是,numpy包含一个convolve函数,我们可以使用它来加快速度.运行平均值相当于x使用N长的向量进行卷积,所有成员都等于1/N.卷积的n​​umpy实现包括起始瞬态,所以你必须删除前N-1个点:

def runningMeanFast(x, N):
    return np.convolve(x, np.ones((N,))/N)[(N-1):]
Run Code Online (Sandbox Code Playgroud)

在我的机器上,快速版本的速度提高了20-30倍,具体取决于输入矢量的长度和平均窗口的大小.

请注意,convolve确实包含一个'same'模式,它似乎应该解决启动瞬态问题,但它会在开始和结束之间进行分割.


Aik*_*ude 20

对于一个简短,快速的解决方案,它在一个循环中完成整个过程,没有依赖关系,下面的代码非常有用.

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []

for i, x in enumerate(mylist, 1):
    cumsum.append(cumsum[i-1] + x)
    if i>=N:
        moving_ave = (cumsum[i] - cumsum[i-N])/N
        #can do stuff with moving_ave here
        moving_aves.append(moving_ave)
Run Code Online (Sandbox Code Playgroud)

  • 快速?!该解决方案比使用Numpy的解决方案慢几个数量级. (15认同)
  • 虽然这个本机解决方案很酷,但 OP 要求使用 numpy/scipy 函数 - 大概这些函数会快得多。 (4认同)
  • 这究竟是如何工作的?我很乐意使用 (2认同)

lit*_*nce 19

或计算的python模块

在Tradewave.net的测试中,TA-lib总是获胜:

import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t

PAIR = info.primary_pair
PERIOD = 30

def initialize():
    storage.reset()
    storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])

def cumsum_sma(array, period):
    ret = np.cumsum(array, dtype=float)
    ret[period:] = ret[period:] - ret[:-period]
    return ret[period - 1:] / period

def pandas_sma(array, period):
    return pd.rolling_mean(array, period)

def api_sma(array, period):
    # this method is native to Tradewave and does NOT return an array
    return (data[PAIR].ma(PERIOD))

def talib_sma(array, period):
    return ta.MA(array, period)

def convolve_sma(array, period):
    return np.convolve(array, np.ones((period,))/period, mode='valid')

def fftconvolve_sma(array, period):    
    return scipy.signal.fftconvolve(
        array, np.ones((period,))/period, mode='valid')    

def tick():

    close = data[PAIR].warmup_period('close')

    t1 = t.time()
    sma_api = api_sma(close, PERIOD)
    t2 = t.time()
    sma_cumsum = cumsum_sma(close, PERIOD)
    t3 = t.time()
    sma_pandas = pandas_sma(close, PERIOD)
    t4 = t.time()
    sma_talib = talib_sma(close, PERIOD)
    t5 = t.time()
    sma_convolve = convolve_sma(close, PERIOD)
    t6 = t.time()
    sma_fftconvolve = fftconvolve_sma(close, PERIOD)
    t7 = t.time()

    storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
    storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
    storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
    storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
    storage.elapsed[-5] = storage.elapsed[-5] + t6-t5    
    storage.elapsed[-6] = storage.elapsed[-6] + t7-t6        

    plot('sma_api', sma_api)  
    plot('sma_cumsum', sma_cumsum[-5])
    plot('sma_pandas', sma_pandas[-10])
    plot('sma_talib', sma_talib[-15])
    plot('sma_convolve', sma_convolve[-20])    
    plot('sma_fftconvolve', sma_fftconvolve[-25])

def stop():

    log('ticks....: %s' % info.max_ticks)

    log('api......: %.5f' % storage.elapsed[-1])
    log('cumsum...: %.5f' % storage.elapsed[-2])
    log('pandas...: %.5f' % storage.elapsed[-3])
    log('talib....: %.5f' % storage.elapsed[-4])
    log('convolve.: %.5f' % storage.elapsed[-5])    
    log('fft......: %.5f' % storage.elapsed[-6])
Run Code Online (Sandbox Code Playgroud)

结果:

[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700  # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

  • 看起来你的时间序列在平滑后发生了变化,这是预期的效果吗? (3认同)

Han*_*ann 18

有关即用型解决方案,请参阅https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html.它提供flat窗口类型的运行平均值.请注意,这比简单的自己动手的卷积方法要复杂一些,因为它试图通过反映它来处理数据开头和结尾的问题(在您的情况下可能会或可能不会. ..).

首先,您可以尝试:

a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)
Run Code Online (Sandbox Code Playgroud)

  • 当输入和输出具有相同性质(例如,两个时间信号)时,我总是对信号处理功能感到烦恼,该信号处理功能返回与输入信号不同形状的输出信号.它打破了与相关自变量(例如,时间,频率)的对应关系,使得绘图或比较不是直接的事情......无论如何,如果你分享这种感觉,你可能想要将所提出的函数的最后几行改为y = np .convolve(W/w.sum()中,s,模式= '相同'); return y [window_len-1 :-( window_len-1)] (7认同)

NeX*_*XuS 15

我知道这是一个老问题,但这是一个不使用任何额外数据结构或库的解决方案.它在输入列表的元素数量上是线性的,我无法想到任何其他方式来提高它的效率(实际上,如果有人知道更好的方法来分配结果,请告诉我).

注意:使用numpy数组而不是列表会更快,但我想消除所有依赖项.通过多线程执行也可以提高性能

该函数假定输入列表是一维的,所以要小心.

### Running mean/Moving average
def running_mean(l, N):
    sum = 0
    result = list( 0 for x in l)

    for i in range( 0, N ):
        sum = sum + l[i]
        result[i] = sum / (i+1)

    for i in range( N, len(l) ):
        sum = sum - l[i-N] + l[i]
        result[i] = sum / N

    return result
Run Code Online (Sandbox Code Playgroud)


moi*_*moi 11

如果保持输入的尺寸很重要(而不是将输出限制在'valid'卷积区域),则可以使用scipy.ndimage.filters.uniform_filter1d:

import numpy as np
from scipy.ndimage.filters import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)

y.shape == x.shape
>>> True
Run Code Online (Sandbox Code Playgroud)

uniform_filter1d允许多种方式来处理'reflect'默认的边框,但在我的情况下,我更喜欢'nearest'.

它也相当快(比快了近50倍np.convolve):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop

%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop
Run Code Online (Sandbox Code Playgroud)

  • 这是似乎考虑到边界问题的唯一答案(相当重要,特别是在绘图时)。谢谢你! (5认同)
  • 我分析了“uniform_filter1d”、带有矩形的“np.convolve”和“np.cumsum”,然后是“np.subtract”。我的结果:(1.)卷积是最慢的。(2.) cumsum/subtract 大约快 20-30 倍。(3.)uniform_filter1d 比 cumsum/subtract 快大约 2-3 倍。**获胜者绝对是uniform_filter1d。** (2认同)
  • 使用 `uniform_filter1d` **比 `cumsum` 解决方案更快**(大约 2-5 倍)。和`uniform_filter1d`[**不会像`cumsum`**那样出现大量浮点错误](/sf/ask/960987471/#comment82346417_27681394)解决方案做。 (2认同)

Cla*_*sen 7

聚会有点晚了,但我已经做了我自己的小函数,它不会用零包裹末端或垫,然后也可以用来找到平均值。进一步的处理是,它还在线性间隔的点处重新采样信号。随意自定义代码以获得其他功能。

该方法是具有归一化高斯核的简单矩阵乘法。

def running_mean(y_in, x_in, N_out=101, sigma=1):
    '''
    Returns running mean as a Bell-curve weighted average at evenly spaced
    points. Does NOT wrap signal around, or pad with zeros.
    
    Arguments:
    y_in -- y values, the values to be smoothed and re-sampled
    x_in -- x values for array
    
    Keyword arguments:
    N_out -- NoOf elements in resampled array.
    sigma -- 'Width' of Bell-curve in units of param x .
    '''
    import numpy as np
    N_in = len(y_in)

    # Gaussian kernel
    x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
    x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
    gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
    # Normalize kernel, such that the sum is one along axis 1
    normalization = np.tile(np.reshape(np.sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
    gauss_kernel_normalized = gauss_kernel / normalization
    # Perform running average as a linear operation
    y_out = gauss_kernel_normalized @ y_in

    return y_out, x_out
Run Code Online (Sandbox Code Playgroud)

添加正态分布噪声的正弦信号的简单用法: 在此处输入图片说明


tim*_*geb 7

Python标准库解决方案

这个生成器函数接受一个可迭代的和一个窗口大小,N 并产生窗口内当前值的平均值。它使用 a deque,这是一种类似于列表的数据结构,但针对两个端点的快速修改 ( pop, append)进行了优化。

from collections import deque
from itertools import islice

def sliding_avg(iterable, N):        
    it = iter(iterable)
    window = deque(islice(it, N))        
    num_vals = len(window)

    if num_vals < N:
        msg = 'window size {} exceeds total number of values {}'
        raise ValueError(msg.format(N, num_vals))

    N = float(N) # force floating point division if using Python 2
    s = sum(window)
    
    while True:
        yield s/N
        try:
            nxt = next(it)
        except StopIteration:
            break
        s = s - window.popleft() + nxt
        window.append(nxt)
        
Run Code Online (Sandbox Code Playgroud)

这是正在运行的函数:

>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>> 
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0
Run Code Online (Sandbox Code Playgroud)


Kri*_*ris 6

我还没有检查过这有多快,但你可以尝试:

from collections import deque

cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum

for t, val in enumerate(A, 1):
    cache.append(val)
    cum_sum += val
    if t < n:
        avg = cum_sum / float(t)
    else:                           # if window is saturated,
        cum_sum -= cache.popleft()  # subtract oldest value
        avg = cum_sum / float(n)
Run Code Online (Sandbox Code Playgroud)

  • 这就是我要做的。任何人都可以批评为什么这是一个糟糕的方式? (2认同)

Ant*_*nwu 6

我觉得使用瓶颈可以很好地解决这个问题

请参阅下面的基本示例:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
Run Code Online (Sandbox Code Playgroud)
  • “ mm”是“ a”的移动平均值。

  • “窗口”是移动平均值要考虑的最大条目数。

  • “ min_count”是移动平均值(例如,对于前几个元素或数组具有nan值)要考虑的最小条目数。

好的部分是Bottleneck有助于处理nan值,并且它也非常有效。


Dmi*_*nov 5

一种使用numpy使用移动平均线的方法pandas

import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(
    lambda a,b: b/a, 
    enumerate(itertools.accumulate(sample), 1))
)
Run Code Online (Sandbox Code Playgroud)

将打印 [2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]

  • 2.0 = (2)/1
  • 4.0 = (2 + 6) / 2
  • 6.0 = (2 + 6 + 10) / 3
  • ...


Gur*_*cor 5

我建议不要用numpy或scipy来更快地做到这一点:

df['data'].rolling(3).mean()
Run Code Online (Sandbox Code Playgroud)

这将采用“数据”列的3个周期的移动平均值(MA)。您还可以计算偏移的版本,例如,不包含当前单元格的版本(向后偏移一格)可以很容易地计算为:

df['data'].shift(periods=1).rolling(3).mean()
Run Code Online (Sandbox Code Playgroud)

  • 2016 年提出的解决方案使用`pandas.rolling_mean`,而我的使用`pandas.DataFrame.rolling`。您还可以使用此方法轻松计算移动 `min(), max(), sum()` 等以及 `mean()`。 (3认同)

gtc*_*der 5

上面有很多关于计算运行平均值的答案。我的回答增加了两个额外的功能:

  1. 忽略 nan 值
  2. 计算 N 个相邻值的平均值,不包括感兴趣的值本身

第二个特征对于确定哪些值与总体趋势存在一定差异特别有用。

我使用 numpy.cumsum 因为它是最省时的方法(参见上面 Alleo 的回答)。

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)
Run Code Online (Sandbox Code Playgroud)

此代码仅适用于 Ns。可以通过更改 padded_x 和 n_nan 的 np.insert 来调整奇数。

示例输出(黑色为原始,蓝色为 movavg): 原始数据(黑色)和每个值周围 10 个点的移动平均值(蓝色),不包括该值。 nan 值被忽略。

此代码可以轻松修改以删除从少于截止值 = 3 个非 nan 值计算出的所有移动平均值。

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)
Run Code Online (Sandbox Code Playgroud)

原始数据(黑色)和移动平均值(蓝色),同时忽略少于 3 个非 nan 值的任何窗口


wor*_*ise 5

mab的评论埋在上面有这种方法的答案之一中。 bottleneckmove_mean这是一个简单的移动平均线:

import numpy as np
import bottleneck as bn

a = np.arange(10) + np.random.random(10)

mva = bn.move_mean(a, window=2, min_count=1)
Run Code Online (Sandbox Code Playgroud)

min_count是一个方便的参数,它基本上会将移动平均线带到数组中的那个点。如果您不设置min_count,它将等于window,并且所有window点数都将是nan


gre*_*tec 5

使用@Aikude 的变量,我写了一行。

import numpy as np

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3

mean = [np.mean(mylist[x:x+N]) for x in range(len(mylist)-N+1)]
print(mean)

>>> [2.0, 3.0, 4.0, 5.0, 6.0]
Run Code Online (Sandbox Code Playgroud)


Her*_*ert 5

上述所有解决方案都很糟糕,因为它们缺乏

  • 由于本机 python 而不是 numpy 矢量化实现,速度更快,
  • 由于使用不当而导致的数值稳定性numpy.cumsum,或
  • 由于O(len(x) * w)作为卷积实现的速度。

给定的

import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000
Run Code Online (Sandbox Code Playgroud)

请注意,x_[:w].sum()等于x[:w-1].sum(). 因此,对于所述第一平均的numpy.cumsum(...)增加x[w] / w(通过x_[w+1] / w),并减去0(从x_[0] / w)。这导致x[0:w].mean()

通过 cumsum,您将通过额外添加x[w+1] / w和减去 来更新第二个平均值x[0] / w,从而得到x[1:w+1].mean()

这一直持续到x[-w:].mean()达到。

x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w
Run Code Online (Sandbox Code Playgroud)

该解决方案是矢量化的、O(m)可读的且数值稳定的。