Rad*_*duS 27 python performance numpy vectorization pandas
如何获得NumPy中的指数加权移动平均值,就像下面的熊猫一样?
import pandas as pd
import pandas_datareader as pdr
from datetime import datetime
# Declare variables
ibm = pdr.get_data_yahoo(symbols='IBM', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)['Adj Close']
windowSize = 20
# Get PANDAS exponential weighted moving average
ewm_pd = pd.DataFrame(ibm).ewm(span=windowSize, min_periods=windowSize).mean().as_matrix()
print(ewm_pd)
Run Code Online (Sandbox Code Playgroud)
我用NumPy尝试了以下内容
import numpy as np
import pandas_datareader as pdr
from datetime import datetime
# From this post: http://stackoverflow.com/a/40085052/3293881 by @Divakar
def strided_app(a, L, S): # Window len = L, Stride len/stepsize = S
nrows = ((a.size - L) // S) + 1
n = a.strides[0]
return np.lib.stride_tricks.as_strided(a, shape=(nrows, L), strides=(S * n, n))
def numpyEWMA(price, windowSize):
weights = np.exp(np.linspace(-1., 0., windowSize))
weights /= weights.sum()
a2D = strided_app(price, windowSize, 1)
returnArray = np.empty((price.shape[0]))
returnArray.fill(np.nan)
for index in (range(a2D.shape[0])):
returnArray[index + windowSize-1] = np.convolve(weights, a2D[index])[windowSize - 1:-windowSize + 1]
return np.reshape(returnArray, (-1, 1))
# Declare variables
ibm = pdr.get_data_yahoo(symbols='IBM', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)['Adj Close']
windowSize = 20
# Get NumPy exponential weighted moving average
ewma_np = numpyEWMA(ibm, windowSize)
print(ewma_np)
Run Code Online (Sandbox Code Playgroud)
但结果与大熊猫的结果并不相似.
是否有更好的方法可以直接在NumPy中计算指数加权移动平均值并获得与pandas.ewm().mean()?相同的结果?
在大熊猫解决方案的60,000个请求中,我得到大约230秒.我确信,使用纯NumPy,这可以显着降低.
Div*_*kar 32
我想我终于搞砸了!
这是一个numpy_ewma功能的矢量化版本,声称可以从@RaduS's post- 产生正确的结果-
def numpy_ewma_vectorized(data, window):
alpha = 2 /(window + 1.0)
alpha_rev = 1-alpha
scale = 1/alpha_rev
n = data.shape[0]
r = np.arange(n)
scale_arr = scale**r
offset = data[0]*alpha_rev**(r+1)
pw0 = alpha*alpha_rev**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = offset + cumsums*scale_arr[::-1]
return out
Run Code Online (Sandbox Code Playgroud)
进一步提升
我们可以通过一些代码重用来进一步提升它,就像这样 -
def numpy_ewma_vectorized_v2(data, window):
alpha = 2 /(window + 1.0)
alpha_rev = 1-alpha
n = data.shape[0]
pows = alpha_rev**(np.arange(n+1))
scale_arr = 1/pows[:-1]
offset = data[0]*pows[1:]
pw0 = alpha*alpha_rev**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = offset + cumsums*scale_arr[::-1]
return out
Run Code Online (Sandbox Code Playgroud)
运行时测试
让我们将这两个时间用于大数据集的相同循环函数.
In [97]: data = np.random.randint(2,9,(5000))
...: window = 20
...:
In [98]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized(data, window))
Out[98]: True
In [99]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized_v2(data, window))
Out[99]: True
In [100]: %timeit numpy_ewma(data, window)
100 loops, best of 3: 6.03 ms per loop
In [101]: %timeit numpy_ewma_vectorized(data, window)
1000 loops, best of 3: 665 µs per loop
In [102]: %timeit numpy_ewma_vectorized_v2(data, window)
1000 loops, best of 3: 357 µs per loop
In [103]: 6030/357.0
Out[103]: 16.89075630252101
Run Code Online (Sandbox Code Playgroud)
大约有17倍的加速!
这是一个使用NumPy的实现,相当于使用df.ewm(alpha=alpha).mean().阅读文档后,它只是一些矩阵操作.诀窍是构建正确的矩阵.
值得注意的是,因为我们正在创建浮动矩阵,所以如果输入数组太大,您可以快速吞噬内存.
import pandas as pd
import numpy as np
def ewma(x, alpha):
'''
Returns the exponentially weighted moving average of x.
Parameters:
-----------
x : array-like
alpha : float {0 <= alpha <= 1}
Returns:
--------
ewma: numpy array
the exponentially weighted moving average
'''
# Coerce x to an array
x = np.array(x)
n = x.size
# Create an initial weight matrix of (1-alpha), and a matrix of powers
# to raise the weights by
w0 = np.ones(shape=(n,n)) * (1-alpha)
p = np.vstack([np.arange(i,i-n,-1) for i in range(n)])
# Create the weight matrix
w = np.tril(w0**p,0)
# Calculate the ewma
return np.dot(w, x[::np.newaxis]) / w.sum(axis=1)
Run Code Online (Sandbox Code Playgroud)
让我们测试一下:
alpha = 0.55
x = np.random.randint(0,30,15)
df = pd.DataFrame(x, columns=['A'])
df.ewm(alpha=alpha).mean()
# returns:
# A
# 0 13.000000
# 1 22.655172
# 2 20.443268
# 3 12.159796
# 4 14.871955
# 5 15.497575
# 6 20.743511
# 7 20.884818
# 8 24.250715
# 9 18.610901
# 10 17.174686
# 11 16.528564
# 12 17.337879
# 13 7.801912
# 14 12.310889
ewma(x=x, alpha=alpha)
# returns:
# array([ 13. , 22.65517241, 20.44326778, 12.1597964 ,
# 14.87195534, 15.4975749 , 20.74351117, 20.88481763,
# 24.25071484, 18.61090129, 17.17468551, 16.52856393,
# 17.33787888, 7.80191235, 12.31088889])
Run Code Online (Sandbox Code Playgroud)
pandas问题是严格要求一个numpy解决方案,然而,似乎OP实际上只是在一个纯粹的numpy解决方案之后加速运行时.
我解决了类似的问题,但转而研究numba.jit大规模加速计算时间
In [24]: a = np.random.random(10**7)
...: df = pd.Series(a)
In [25]: %timeit numpy_ewma(a, 10) # /a/42915307/4013571
...: %timeit df.ewm(span=10).mean() # pandas
...: %timeit numpy_ewma_vectorized_v2(a, 10) # best w/o numba: /a/42926270/4013571
...: %timeit _ewma(a, 10) # fastest accurate (below)
...: %timeit _ewma_infinite_hist(a, 10) # fastest overall (below)
4.14 s ± 116 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
991 ms ± 52.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
396 ms ± 8.39 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
181 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
39.6 ms ± 979 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Run Code Online (Sandbox Code Playgroud)
缩小到较小的数组a = np.random.random(100)(结果相同)
41.6 µs ± 491 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
945 ms ± 12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
16 µs ± 93.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
1.66 µs ± 13.7 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
1.14 µs ± 5.57 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
Run Code Online (Sandbox Code Playgroud)
值得指出的是,我的下面的函数与pandas(与docstr中的示例)完全一致,而这里的一些答案采用了各种不同的近似.例如,
In [57]: print(pd.DataFrame([1,2,3]).ewm(span=2).mean().values.ravel())
...: print(numpy_ewma_vectorized_v2(np.array([1,2,3]), 2))
...: print(numpy_ewma(np.array([1,2,3]), 2))
[1. 1.75 2.61538462]
[1. 1.66666667 2.55555556]
[1. 1.18181818 1.51239669]
Run Code Online (Sandbox Code Playgroud)
我为自己的库记录的源代码
import numpy as np
from numba import jit
from numba import float64
from numba import int64
@jit((float64[:], int64), nopython=True, nogil=True)
def _ewma(arr_in, window):
r"""Exponentialy weighted moving average specified by a decay ``window``
to provide better adjustments for small windows via:
y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
(1 + (1-a) + (1-a)^2 + ... + (1-a)^n).
Parameters
----------
arr_in : np.ndarray, float64
A single dimenisional numpy array
window : int64
The decay window, or 'span'
Returns
-------
np.ndarray
The EWMA vector, same length / shape as ``arr_in``
Examples
--------
>>> import pandas as pd
>>> a = np.arange(5, dtype=float)
>>> exp = pd.DataFrame(a).ewm(span=10, adjust=True).mean()
>>> np.array_equal(_ewma_infinite_hist(a, 10), exp.values.ravel())
True
"""
n = arr_in.shape[0]
ewma = np.empty(n, dtype=float64)
alpha = 2 / float(window + 1)
w = 1
ewma_old = arr_in[0]
ewma[0] = ewma_old
for i in range(1, n):
w += (1-alpha)**i
ewma_old = ewma_old*(1-alpha) + arr_in[i]
ewma[i] = ewma_old / w
return ewma
@jit((float64[:], int64), nopython=True, nogil=True)
def _ewma_infinite_hist(arr_in, window):
r"""Exponentialy weighted moving average specified by a decay ``window``
assuming infinite history via the recursive form:
(2) (i) y[0] = x[0]; and
(ii) y[t] = a*x[t] + (1-a)*y[t-1] for t>0.
This method is less accurate that ``_ewma`` but
much faster:
In [1]: import numpy as np, bars
...: arr = np.random.random(100000)
...: %timeit bars._ewma(arr, 10)
...: %timeit bars._ewma_infinite_hist(arr, 10)
3.74 ms ± 60.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
262 µs ± 1.54 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Parameters
----------
arr_in : np.ndarray, float64
A single dimenisional numpy array
window : int64
The decay window, or 'span'
Returns
-------
np.ndarray
The EWMA vector, same length / shape as ``arr_in``
Examples
--------
>>> import pandas as pd
>>> a = np.arange(5, dtype=float)
>>> exp = pd.DataFrame(a).ewm(span=10, adjust=False).mean()
>>> np.array_equal(_ewma_infinite_hist(a, 10), exp.values.ravel())
True
"""
n = arr_in.shape[0]
ewma = np.empty(n, dtype=float64)
alpha = 2 / float(window + 1)
ewma[0] = arr_in[0]
for i in range(1, n):
ewma[i] = arr_in[i] * alpha + ewma[i-1] * (1 - alpha)
return ewma
Run Code Online (Sandbox Code Playgroud)
更新于08/06/2019
大型输入的纯,快速和保护的解决方案
out用于原位计算的
dtype参数,参数,索引order参数
此功能等效于pandas ewm(adjust=False).mean(),但速度更快。ewm(adjust=True).mean()(熊猫的默认设置)在结果开始时会产生不同的值。我正在努力adjust为该解决方案添加功能。
当输入太大时,@ Divakar的答案会导致浮点精度问题。这是因为,(1-alpha)**(n+1) -> 0当n -> inf和时alpha -> 1,会导致被零除,并且NaN在计算中会弹出值。
这是我最快的解决方案,几乎没有向量化,也没有精度问题。它有点复杂,但是性能却很好,尤其是对于非常庞大的输入。不使用就地计算(可以使用out参数来节省内存分配时间):在相当老的PC上,对于100M元素输入向量为3.62秒,对于100K元素输入向量为3.2ms,对于5000个元素输入向量为293µs (结果会随alpha/ row_size值的不同而有所不同)。
# tested with python3 & numpy 1.15.2
import numpy as np
def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order='C', out=None):
"""
Reshapes data before calculating EWMA, then iterates once over the rows
to calculate the offset without precision issues
:param data: Input data, will be flattened.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param row_size: int, optional
The row size to use in the computation. High row sizes need higher precision,
low values will impact performance. The optimal value depends on the
platform and the alpha being used. Higher alpha values require lower
row size. Default depends on dtype.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
:return: The flattened result.
"""
data = np.array(data, copy=False)
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float
else:
dtype = np.dtype(dtype)
row_size = int(row_size) if row_size is not None
else get_max_row_size(alpha, dtype)
if data.size <= row_size:
# The normal function can handle this input, use that
return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)
if data.ndim > 1:
# flatten input
data = np.reshape(data, -1, order=order)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
row_n = int(data.size // row_size) # the number of rows to use
trailing_n = int(data.size % row_size) # the amount of data leftover
first_offset = data[0]
if trailing_n > 0:
# set temporary results to slice view of out parameter
out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))
data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))
else:
out_main_view = out
data_main_view = data
# get all the scaled cumulative sums with 0 offset
ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype,
order='C', out=out_main_view)
scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1)
last_scaling_factor = scaling_factors[-1]
# create offset array
offsets = np.empty(out_main_view.shape[0], dtype=dtype)
offsets[0] = first_offset
# iteratively calculate offset for each row
for i in range(1, out_main_view.shape[0]):
offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]
# add the offsets to the result
out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]
if trailing_n > 0:
# process trailing data in the 2nd slice of the out parameter
ewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1],
dtype=dtype, order='C', out=out[-trailing_n:])
return out
def get_max_row_size(alpha, dtype=float):
assert 0. <= alpha < 1.
# This will return the maximum row size possible on
# your platform for the given dtype. I can find no impact on accuracy
# at this value on my machine.
# Might not be the optimal value for speed, which is hard to predict
# due to numpy's optimizations
# Use np.finfo(dtype).eps if you are worried about accuracy
# and want to be extra safe.
epsilon = np.finfo(dtype).tiny
# If this produces an OverflowError, make epsilon larger
return int(np.log(epsilon)/np.log(1-alpha)) + 1
Run Code Online (Sandbox Code Playgroud)
一维ewma函数:
def ewma_vectorized(data, alpha, offset=None, dtype=None, order='C', out=None):
"""
Calculates the exponential moving average over a vector.
Will fail for large inputs.
:param data: Input data
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param offset: optional
The offset for the moving average, scalar. Defaults to data[0].
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the input. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
if data.ndim > 1:
# flatten input
data = data.reshape(-1, order)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if offset is None:
offset = data[0]
alpha = np.array(alpha, copy=False).astype(dtype, copy=False)
# scaling_factors -> 0 as len(data) gets large
# this leads to divide-by-zeros below
scaling_factors = np.power(1. - alpha, np.arange(data.size + 1, dtype=dtype),
dtype=dtype)
# create cumulative sum array
np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1],
dtype=dtype, out=out)
np.cumsum(out, dtype=dtype, out=out)
# cumsums / scaling
out /= scaling_factors[-2::-1]
if offset != 0:
offset = np.array(offset, copy=False).astype(dtype, copy=False)
# add offsets
out += offset * scaling_factors[1:]
return out
Run Code Online (Sandbox Code Playgroud)
2D ewma函数:
def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order='C', out=None):
"""
Calculates the exponential moving average over a given axis.
:param data: Input data, must be 1D or 2D array.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param axis: The axis to apply the moving average on.
If axis==None, the data is flattened.
:param offset: optional
The offset for the moving average. Must be scalar or a
vector with one element for each row of data. If set to None,
defaults to the first value of each row.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Ignored if axis is not None.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)
assert data.ndim <= 2
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if axis is None or data.ndim < 2:
# use 1D version
if isinstance(offset, np.ndarray):
offset = offset[0]
return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order,
out=out)
assert -data.ndim <= axis < data.ndim
# create reshaped data views
out_view = out
if axis < 0:
axis = data.ndim - int(axis)
if axis == 0:
# transpose data views so columns are treated as rows
data = data.T
out_view = out_view.T
if offset is None:
# use the first element of each row as the offset
offset = np.copy(data[:, 0])
elif np.size(offset) == 1:
offset = np.reshape(offset, (1,))
alpha = np.array(alpha, copy=False).astype(dtype, copy=False)
# calculate the moving average
row_size = data.shape[1]
row_n = data.shape[0]
scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype),
dtype=dtype)
# create a scaled cumulative sum array
np.multiply(
data,
np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype),
dtype=dtype)
/ scaling_factors[np.newaxis, :-1],
dtype=dtype, out=out_view
)
np.cumsum(out_view, axis=1, dtype=dtype, out=out_view)
out_view /= scaling_factors[np.newaxis, -2::-1]
if not (np.size(offset) == 1 and offset == 0):
offset = offset.astype(dtype, copy=False)
# add the offsets to the scaled cumulative sums
out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]
return out
Run Code Online (Sandbox Code Playgroud)
用法:
data_n = 100000000
data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100
span = 5000 # span >= 1
alpha = 2/(span+1) # for pandas` span parameter
# com = 1000 # com >= 0
# alpha = 1/(1+com) # for pandas` center-of-mass parameter
# halflife = 100 # halflife > 0
# alpha = 1 - np.exp(np.log(0.5)/halflife) # for pandas` half-life parameter
result = ewma_vectorized_safe(data, alpha)
Run Code Online (Sandbox Code Playgroud)
只是一个提示
根据给定alpha窗口中数据对平均值的贡献,可以很容易地计算给定“窗口大小”(技术上的指数平均值具有无限的“窗口”)。例如,这对于选择由于边界效应而将结果的起始部分视为不可靠的情况很有用。
def window_size(alpha, sum_proportion):
# Increases with increased sum_proportion and decreased alpha
# solve (1-alpha)**window_size = (1-sum_proportion) for window_size
return int(np.log(1-sum_proportion) / np.log(1-alpha))
alpha = 0.02
sum_proportion = .99 # window covers 99% of contribution to the moving average
window = window_size(alpha, sum_proportion) # = 227
sum_proportion = .75 # window covers 75% of contribution to the moving average
window = window_size(alpha, sum_proportion) # = 68
Run Code Online (Sandbox Code Playgroud)
alpha = 2 / (window_size + 1.0)此线程中使用的关系(pandas的'span'选项)与上述函数(带有sum_proportion~=0.87)的逆函数非常近似。alpha = 1 - np.exp(np.log(1-sum_proportion)/window_size)更准确(pandas的“半衰期”选项等于sum_proportion=0.5)。
在下面的示例中,data代表一个连续的噪声信号。cutoff_idx是第一个位置,result其中至少99%的值取决于其中的单独值data(即小于1%的值取决于data [0])。直到cutoff_idx最终的数据都被排除在数据之外,因为它太依赖于中的第一个值data,因此可能会扭曲平均值。
result = ewma_vectorized_safe(data, alpha, chunk_size)
sum_proportion = .99
cutoff_idx = window_size(alpha, sum_proportion)
result = result[cutoff_idx:]
Run Code Online (Sandbox Code Playgroud)
为了说明上面解决的问题,您可以运行几次,请注意红线经常出现的错误的开始,在以下地方跳过cutoff_idx:
data_n = 100000
data = np.random.rand(data_n) * 100
window = 1000
sum_proportion = .99
alpha = 1 - np.exp(np.log(1-sum_proportion)/window)
result = ewma_vectorized_safe(data, alpha)
cutoff_idx = window_size(alpha, sum_proportion)
x = np.arange(start=0, stop=result.size)
import matplotlib.pyplot as plt
plt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], '-r',
x[cutoff_idx:], result[cutoff_idx:], '-b')
plt.show()
Run Code Online (Sandbox Code Playgroud)
请注意,cutoff_idx==window因为alpha是使用window_size()函数的反函数设置的,所以具有相同的sum_proportion。这类似于大熊猫的用法ewm(span=window, min_periods=window)。
给出alpha和windowSize,这是一种模拟NumPy上相应行为的方法 -
def numpy_ewm_alpha(a, alpha, windowSize):
wghts = (1-alpha)**np.arange(windowSize)
wghts /= wghts.sum()
out = np.full(df.shape[0],np.nan)
out[windowSize-1:] = np.convolve(a,wghts,'valid')
return out
Run Code Online (Sandbox Code Playgroud)
样本运行以进行验证 -
In [54]: alpha = 0.55
...: windowSize = 20
...:
In [55]: df = pd.DataFrame(np.random.randint(2,9,(100)))
In [56]: out0 = df.ewm(alpha = alpha, min_periods=windowSize).mean().as_matrix().ravel()
...: out1 = numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
...: print "Max. error : " + str(np.nanmax(np.abs(out0 - out1)))
...:
Max. error : 5.10531254605e-07
In [57]: alpha = 0.75
...: windowSize = 30
...:
In [58]: out0 = df.ewm(alpha = alpha, min_periods=windowSize).mean().as_matrix().ravel()
...: out1 = numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
...: print "Max. error : " + str(np.nanmax(np.abs(out0 - out1)))
Max. error : 8.881784197e-16
Run Code Online (Sandbox Code Playgroud)
更大的数据集上的运行时测试 -
In [61]: alpha = 0.55
...: windowSize = 20
...:
In [62]: df = pd.DataFrame(np.random.randint(2,9,(10000)))
In [63]: %timeit df.ewm(alpha = alpha, min_periods=windowSize).mean()
1000 loops, best of 3: 851 µs per loop
In [64]: %timeit numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
1000 loops, best of 3: 204 µs per loop
Run Code Online (Sandbox Code Playgroud)
进一步提升
为了进一步提升性能,我们可以避免使用NaN进行初始化,而是使用从中输出的数组np.convolve,如下所示 -
def numpy_ewm_alpha_v2(a, alpha, windowSize):
wghts = (1-alpha)**np.arange(windowSize)
wghts /= wghts.sum()
out = np.convolve(a,wghts)
out[:windowSize-1] = np.nan
return out[:a.size]
Run Code Online (Sandbox Code Playgroud)
计时 -
In [117]: alpha = 0.55
...: windowSize = 20
...:
In [118]: df = pd.DataFrame(np.random.randint(2,9,(10000)))
In [119]: %timeit numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
1000 loops, best of 3: 204 µs per loop
In [120]: %timeit numpy_ewm_alpha_v2(df.values.ravel(), alpha = alpha, windowSize = windowSize)
10000 loops, best of 3: 195 µs per loop
Run Code Online (Sandbox Code Playgroud)
避免 numba 并且在Alexander McFarlane 的大型阵列解决方案的因子 2 内的一个非常简单的解决方案是使用 scipy 的lfilter函数(因为 EWMA 是线性过滤器):
from scipy.signal import lfiltic, lfilter
# careful not to mix between scipy.signal and standard python signal
# (https://docs.python.org/3/library/signal.html) if your code handles some processes
def ewma_linear_filter(array, window):
alpha = 2 /(window + 1)
b = [alpha]
a = [1, alpha-1]
zi = lfiltic(b, a, array[0:1], [0])
return lfilter(b, a, array, zi=zi)[0]
Run Code Online (Sandbox Code Playgroud)
时间安排如下:
window = 100 # doesn't have any impact on run time
for n in [1000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000]:
data = np.random.normal(0, 1, n)
print(f'n={n:,d}, window={window}')
%timeit _ewma_infinite_hist(data, window)
%timeit ewma_linear_filter(data, window)
print()
n=1,000, window=100
5.01 µs ± 23.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
58.4 µs ± 1.05 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
n=10,000, window=100
39 µs ± 101 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
134 µs ± 387 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
n=100,000, window=100
373 µs ± 2.56 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
845 µs ± 2.27 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
n=1,000,000, window=100
5.35 ms ± 22 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
9.77 ms ± 78.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
n=10000000, window=100
53.7 ms ± 200 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
96.6 ms ± 2.28 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
n=10,0000,000, window=100
547 ms ± 5.02 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
963 ms ± 4.52 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)
小智 5
这个答案似乎无关紧要。但是,对于那些还需要使用 NumPy 计算指数加权方差(以及标准差)的人,以下解决方案将很有用:
import numpy as np
def ew(a, alpha, winSize):
_alpha = 1 - alpha
ws = _alpha ** np.arange(winSize)
w_sum = ws.sum()
ew_mean = np.convolve(a, ws)[winSize - 1] / w_sum
bias = (w_sum ** 2) / ((w_sum ** 2) - (ws ** 2).sum())
ew_var = (np.convolve((a - ew_mean) ** 2, ws)[winSize - 1] / w_sum) * bias
ew_std = np.sqrt(ew_var)
return (ew_mean, ew_var, ew_std)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
24144 次 |
| 最近记录: |