She*_*284 164 python numpy matplotlib scipy python-2.7
是否有一个scipy函数或numpy函数或模块用于python,在给定特定窗口的情况下计算一维数组的运行平均值?
lap*_*pis 214
UPD:Alleo和jasaarim提出了更有效的解决方案.
你可以用np.convolve它:
np.convolve(x, np.ones((N,))/N, mode='valid')
Run Code Online (Sandbox Code Playgroud)
运行平均值是卷积数学运算的一个例子.对于运行平均值,您沿着输入滑动窗口并计算窗口内容的平均值.对于离散的1D信号,卷积是相同的,除了代替计算任意线性组合的平均值,即将每个元素乘以相应的系数并将结果相加.那些系数,一个用于窗口中的每个位置,有时称为卷积核.现在,N值的算术平均值是(x_1 + x_2 + ... + x_N) / N,所以相应的内核是(1/N, 1/N, ..., 1/N),这正是我们通过使用得到的np.ones((N,))/N.
该mode的参数np.convolve指定如何处理边缘.我在valid这里选择了这种模式,因为我认为这是大多数人对运行方式的期望,但你可能还有其他优先事项.这是一个说明模式之间差异的图:
import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
plt.plot(np.convolve(np.ones((200,)), np.ones((50,))/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()
Run Code Online (Sandbox Code Playgroud)

All*_*leo 135
卷积比直接的方法好得多,但(我猜)它使用FFT,因此非常慢.但是,特别是为计算运行平均值,以下方法工作正常
def running_mean(x, N):
cumsum = numpy.cumsum(numpy.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / float(N)
Run Code Online (Sandbox Code Playgroud)
要检查的代码
In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop
Run Code Online (Sandbox Code Playgroud)
注意numpy.allclose(result1, result2)是True,这两种方法是等效的.N越大,时间差异越大.
jas*_*rim 70
更新:下面的示例显示了pandas.rolling_mean在最近版本的pandas中删除的旧功能.下面的函数调用的现代等价物将是
In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]:
array([ 0.49815397, 0.49844183, 0.49840518, ..., 0.49488191,
0.49456679, 0.49427121])
Run Code Online (Sandbox Code Playgroud)
熊猫比NumPy或SciPy更适合这种情况.它的功能rolling_mean可以方便地完成工作.当输入是数组时,它还返回NumPy数组.
rolling_mean使用任何自定义纯Python实现很难超越性能.以下是针对两个建议解决方案的示例性能:
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: def running_mean(x, N):
...: cumsum = np.cumsum(np.insert(x, 0, 0))
...: return (cumsum[N:] - cumsum[:-N]) / N
...:
In [4]: x = np.random.random(100000)
In [5]: N = 1000
In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop
In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop
In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop
In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True
Run Code Online (Sandbox Code Playgroud)
关于如何处理边缘值也有很好的选择.
mtr*_*trw 49
您可以使用以下公式计算运行平均值
import numpy as np
def runningMean(x, N):
y = np.zeros((len(x),))
for ctr in range(len(x)):
y[ctr] = np.sum(x[ctr:(ctr+N)])
return y/N
Run Code Online (Sandbox Code Playgroud)
但它很慢.
幸运的是,numpy包含一个convolve函数,我们可以使用它来加快速度.运行平均值相当于x使用N长的向量进行卷积,所有成员都等于1/N.卷积的numpy实现包括起始瞬态,所以你必须删除前N-1个点:
def runningMeanFast(x, N):
return np.convolve(x, np.ones((N,))/N)[(N-1):]
Run Code Online (Sandbox Code Playgroud)
在我的机器上,快速版本的速度提高了20-30倍,具体取决于输入矢量的长度和平均窗口的大小.
请注意,convolve确实包含一个'same'模式,它似乎应该解决启动瞬态问题,但它会在开始和结束之间进行分割.
Aik*_*ude 20
对于一个简短,快速的解决方案,它在一个循环中完成整个过程,没有依赖关系,下面的代码非常有用.
mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []
for i, x in enumerate(mylist, 1):
cumsum.append(cumsum[i-1] + x)
if i>=N:
moving_ave = (cumsum[i] - cumsum[i-N])/N
#can do stuff with moving_ave here
moving_aves.append(moving_ave)
Run Code Online (Sandbox Code Playgroud)
lit*_*nce 19
或计算的python模块
在Tradewave.net的测试中,TA-lib总是获胜:
import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t
PAIR = info.primary_pair
PERIOD = 30
def initialize():
storage.reset()
storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])
def cumsum_sma(array, period):
ret = np.cumsum(array, dtype=float)
ret[period:] = ret[period:] - ret[:-period]
return ret[period - 1:] / period
def pandas_sma(array, period):
return pd.rolling_mean(array, period)
def api_sma(array, period):
# this method is native to Tradewave and does NOT return an array
return (data[PAIR].ma(PERIOD))
def talib_sma(array, period):
return ta.MA(array, period)
def convolve_sma(array, period):
return np.convolve(array, np.ones((period,))/period, mode='valid')
def fftconvolve_sma(array, period):
return scipy.signal.fftconvolve(
array, np.ones((period,))/period, mode='valid')
def tick():
close = data[PAIR].warmup_period('close')
t1 = t.time()
sma_api = api_sma(close, PERIOD)
t2 = t.time()
sma_cumsum = cumsum_sma(close, PERIOD)
t3 = t.time()
sma_pandas = pandas_sma(close, PERIOD)
t4 = t.time()
sma_talib = talib_sma(close, PERIOD)
t5 = t.time()
sma_convolve = convolve_sma(close, PERIOD)
t6 = t.time()
sma_fftconvolve = fftconvolve_sma(close, PERIOD)
t7 = t.time()
storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
storage.elapsed[-5] = storage.elapsed[-5] + t6-t5
storage.elapsed[-6] = storage.elapsed[-6] + t7-t6
plot('sma_api', sma_api)
plot('sma_cumsum', sma_cumsum[-5])
plot('sma_pandas', sma_pandas[-10])
plot('sma_talib', sma_talib[-15])
plot('sma_convolve', sma_convolve[-20])
plot('sma_fftconvolve', sma_fftconvolve[-25])
def stop():
log('ticks....: %s' % info.max_ticks)
log('api......: %.5f' % storage.elapsed[-1])
log('cumsum...: %.5f' % storage.elapsed[-2])
log('pandas...: %.5f' % storage.elapsed[-3])
log('talib....: %.5f' % storage.elapsed[-4])
log('convolve.: %.5f' % storage.elapsed[-5])
log('fft......: %.5f' % storage.elapsed[-6])
Run Code Online (Sandbox Code Playgroud)
结果:
[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700 # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306
Run Code Online (Sandbox Code Playgroud)
Han*_*ann 18
有关即用型解决方案,请参阅https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html.它提供flat窗口类型的运行平均值.请注意,这比简单的自己动手的卷积方法要复杂一些,因为它试图通过反映它来处理数据开头和结尾的问题(在您的情况下可能会或可能不会. ..).
首先,您可以尝试:
a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)
Run Code Online (Sandbox Code Playgroud)
NeX*_*XuS 15
我知道这是一个老问题,但这是一个不使用任何额外数据结构或库的解决方案.它在输入列表的元素数量上是线性的,我无法想到任何其他方式来提高它的效率(实际上,如果有人知道更好的方法来分配结果,请告诉我).
注意:使用numpy数组而不是列表会更快,但我想消除所有依赖项.通过多线程执行也可以提高性能
该函数假定输入列表是一维的,所以要小心.
### Running mean/Moving average
def running_mean(l, N):
sum = 0
result = list( 0 for x in l)
for i in range( 0, N ):
sum = sum + l[i]
result[i] = sum / (i+1)
for i in range( N, len(l) ):
sum = sum - l[i-N] + l[i]
result[i] = sum / N
return result
Run Code Online (Sandbox Code Playgroud)
moi*_*moi 11
如果保持输入的尺寸很重要(而不是将输出限制在'valid'卷积区域),则可以使用scipy.ndimage.filters.uniform_filter1d:
import numpy as np
from scipy.ndimage.filters import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)
y.shape == x.shape
>>> True
Run Code Online (Sandbox Code Playgroud)
uniform_filter1d允许多种方式来处理'reflect'默认的边框,但在我的情况下,我更喜欢'nearest'.
它也相当快(比快了近50倍np.convolve):
%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop
%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop
Run Code Online (Sandbox Code Playgroud)
聚会有点晚了,但我已经做了我自己的小函数,它不会用零包裹末端或垫,然后也可以用来找到平均值。进一步的处理是,它还在线性间隔的点处重新采样信号。随意自定义代码以获得其他功能。
该方法是具有归一化高斯核的简单矩阵乘法。
def running_mean(y_in, x_in, N_out=101, sigma=1):
'''
Returns running mean as a Bell-curve weighted average at evenly spaced
points. Does NOT wrap signal around, or pad with zeros.
Arguments:
y_in -- y values, the values to be smoothed and re-sampled
x_in -- x values for array
Keyword arguments:
N_out -- NoOf elements in resampled array.
sigma -- 'Width' of Bell-curve in units of param x .
'''
import numpy as np
N_in = len(y_in)
# Gaussian kernel
x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
# Normalize kernel, such that the sum is one along axis 1
normalization = np.tile(np.reshape(np.sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
gauss_kernel_normalized = gauss_kernel / normalization
# Perform running average as a linear operation
y_out = gauss_kernel_normalized @ y_in
return y_out, x_out
Run Code Online (Sandbox Code Playgroud)
Python标准库解决方案
这个生成器函数接受一个可迭代的和一个窗口大小,N 并产生窗口内当前值的平均值。它使用 a deque,这是一种类似于列表的数据结构,但针对两个端点的快速修改 ( pop, append)进行了优化。
from collections import deque
from itertools import islice
def sliding_avg(iterable, N):
it = iter(iterable)
window = deque(islice(it, N))
num_vals = len(window)
if num_vals < N:
msg = 'window size {} exceeds total number of values {}'
raise ValueError(msg.format(N, num_vals))
N = float(N) # force floating point division if using Python 2
s = sum(window)
while True:
yield s/N
try:
nxt = next(it)
except StopIteration:
break
s = s - window.popleft() + nxt
window.append(nxt)
Run Code Online (Sandbox Code Playgroud)
这是正在运行的函数:
>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>>
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0
Run Code Online (Sandbox Code Playgroud)
我还没有检查过这有多快,但你可以尝试:
from collections import deque
cache = deque() # keep track of seen values
n = 10 # window size
A = xrange(100) # some dummy iterable
cum_sum = 0 # initialize cumulative sum
for t, val in enumerate(A, 1):
cache.append(val)
cum_sum += val
if t < n:
avg = cum_sum / float(t)
else: # if window is saturated,
cum_sum -= cache.popleft() # subtract oldest value
avg = cum_sum / float(n)
Run Code Online (Sandbox Code Playgroud)
我觉得使用瓶颈可以很好地解决这个问题
请参阅下面的基本示例:
import numpy as np
import bottleneck as bn
a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
Run Code Online (Sandbox Code Playgroud)
“ mm”是“ a”的移动平均值。
“窗口”是移动平均值要考虑的最大条目数。
“ min_count”是移动平均值(例如,对于前几个元素或数组具有nan值)要考虑的最小条目数。
好的部分是Bottleneck有助于处理nan值,并且它也非常有效。
另一种不使用numpy或不使用移动平均线的方法pandas
import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(
lambda a,b: b/a,
enumerate(itertools.accumulate(sample), 1))
)
Run Code Online (Sandbox Code Playgroud)
将打印 [2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]
我建议不要用numpy或scipy来更快地做到这一点:
df['data'].rolling(3).mean()
Run Code Online (Sandbox Code Playgroud)
这将采用“数据”列的3个周期的移动平均值(MA)。您还可以计算偏移的版本,例如,不包含当前单元格的版本(向后偏移一格)可以很容易地计算为:
df['data'].shift(periods=1).rolling(3).mean()
Run Code Online (Sandbox Code Playgroud)
上面有很多关于计算运行平均值的答案。我的回答增加了两个额外的功能:
第二个特征对于确定哪些值与总体趋势存在一定差异特别有用。
我使用 numpy.cumsum 因为它是最省时的方法(参见上面 Alleo 的回答)。
N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x)
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)
Run Code Online (Sandbox Code Playgroud)
此代码仅适用于 Ns。可以通过更改 padded_x 和 n_nan 的 np.insert 来调整奇数。
此代码可以轻松修改以删除从少于截止值 = 3 个非 nan 值计算出的所有移动平均值。
window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)
Run Code Online (Sandbox Code Playgroud)
mab的评论埋在上面有这种方法的答案之一中。 bottleneck有move_mean这是一个简单的移动平均线:
import numpy as np
import bottleneck as bn
a = np.arange(10) + np.random.random(10)
mva = bn.move_mean(a, window=2, min_count=1)
Run Code Online (Sandbox Code Playgroud)
min_count是一个方便的参数,它基本上会将移动平均线带到数组中的那个点。如果您不设置min_count,它将等于window,并且所有window点数都将是nan。
使用@Aikude 的变量,我写了一行。
import numpy as np
mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
mean = [np.mean(mylist[x:x+N]) for x in range(len(mylist)-N+1)]
print(mean)
>>> [2.0, 3.0, 4.0, 5.0, 6.0]
Run Code Online (Sandbox Code Playgroud)
上述所有解决方案都很糟糕,因为它们缺乏
numpy.cumsum,或O(len(x) * w)作为卷积实现的速度。给定的
import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000
Run Code Online (Sandbox Code Playgroud)
请注意,x_[:w].sum()等于x[:w-1].sum(). 因此,对于所述第一平均的numpy.cumsum(...)增加x[w] / w(通过x_[w+1] / w),并减去0(从x_[0] / w)。这导致x[0:w].mean()
通过 cumsum,您将通过额外添加x[w+1] / w和减去 来更新第二个平均值x[0] / w,从而得到x[1:w+1].mean()。
这一直持续到x[-w:].mean()达到。
x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w
Run Code Online (Sandbox Code Playgroud)
该解决方案是矢量化的、O(m)可读的且数值稳定的。
| 归档时间: |
|
| 查看次数: |
293000 次 |
| 最近记录: |