Arnaud Legoux 移动平均线和 numpy

Pro*_*hii 5 python numpy vectorization pandas

我想编写代码的向量版本,使用 NumPy(或 Pandas)计算 Arnaud Legoux 移动平均线。你能帮我解决这个问题吗?谢谢。

非矢量版本如下所示(见下文)。

def NPALMA(pnp_array, **kwargs) :
    '''
    ALMA - Arnaud Legoux Moving Average,
    http://www.financial-hacker.com/trend-delusion-or-reality/
    https://github.com/darwinsys/Trading_Strategies/blob/master/ML/Features.py
    '''
    length = kwargs['length']
    # just some number (6.0 is useful)
    sigma = kwargs['sigma']
    # sensisitivity (close to 1) or smoothness (close to 0)
    offset = kwargs['offset']

    asize = length - 1
    m = offset * asize
    s = length  / sigma
    dss = 2 * s * s

    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]

    return alma
Run Code Online (Sandbox Code Playgroud)

Div*_*kar 4

起始方法

\n\n

我们可以沿第一个轴创建滑动窗口,然后使用张量乘法与wtd求和值的范围相乘。

\n\n

实现看起来像这样 -

\n\n
# Get all wtd values in an array\nwtds = np.exp(-(np.arange(length) - m)**2/dss)\n\n# Get the sliding windows for input array along first axis\npnp_array3D = strided_axis0(pnp_array,len(wtds))\n\n# Initialize o/p array\nout = np.zeros(pnp_array.shape)\n\n# Get sum-reductions for the windows which don\'t need wrapping over\nout[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]\n\n# Last element of the output needed wrapping. So, do it separately.\nout[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])\n\n# Finally perform the divisions\nout /= wtds.sum()\n
Run Code Online (Sandbox Code Playgroud)\n\n

获取滑动窗口的函数:strided_axis0来自here.

\n\n

1D卷积增强

\n\n

这些与wtds值的乘法以及它们的求和基本上是沿着第一个轴的卷积。因此,我们可以使用scipy.ndimage.convolve1d沿axis=0。考虑到内存效率,这会快得多,因为我们不会创建巨大的滑动窗口。

\n\n

实施将是 -

\n\n
from scipy.ndimage import convolve1d as conv\n\navgs = conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode=\'wrap\')\n
Run Code Online (Sandbox Code Playgroud)\n\n

因此,out[length-1:]非零行的 与 相同avgs[:-length+1]

\n\n

如果我们使用的内核数非常小,则可能会存在一些精度差异wtds。因此,如果使用此方法,请记住这一点convolution

\n\n

运行时测试

\n\n

方法 -

\n\n
def original_app(pnp_array, length, m, dss):\n    alma = np.zeros(pnp_array.shape)\n    wtd_sum = np.zeros(pnp_array.shape)\n\n    for l in range(len(pnp_array)):\n        if l >= asize:\n            for i in range(length):\n                im = i - m\n                wtd = np.exp( -(im * im) / dss)\n                alma[l] += pnp_array[l - length + i] * wtd\n                wtd_sum[l] += wtd\n            alma[l] = alma[l] / wtd_sum[l]\n    return alma\n\ndef vectorized_app1(pnp_array, length, m, dss):\n    wtds = np.exp(-(np.arange(length) - m)**2/dss)\n    pnp_array3D = strided_axis0(pnp_array,len(wtds))\n    out = np.zeros(pnp_array.shape)\n    out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]\n    out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])\n    out /= wtds.sum()\n    return out\n\ndef vectorized_app2(pnp_array, length, m, dss):\n    wtds = np.exp(-(np.arange(length) - m)**2/dss)\n    return conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode=\'wrap\')\n
Run Code Online (Sandbox Code Playgroud)\n\n

时间安排 -

\n\n
In [470]: np.random.seed(0)\n     ...: m,n = 1000,100\n     ...: pnp_array = np.random.rand(m,n)\n     ...: \n     ...: length = 6\n     ...: sigma = 0.3\n     ...: offset = 0.5\n     ...: \n     ...: asize = length - 1\n     ...: m = np.floor(offset * asize)\n     ...: s = length  / sigma\n     ...: dss = 2 * s * s\n     ...: \n\nIn [471]: %timeit original_app(pnp_array, length, m, dss)\n     ...: %timeit vectorized_app1(pnp_array, length, m, dss)\n     ...: %timeit vectorized_app2(pnp_array, length, m, dss)\n     ...: \n10 loops, best of 3: 36.1 ms per loop\n1000 loops, best of 3: 1.84 ms per loop\n1000 loops, best of 3: 684 \xc2\xb5s per loop\n\nIn [472]: np.random.seed(0)\n     ...: m,n = 10000,1000 # rest same as previous one\n\nIn [473]: %timeit original_app(pnp_array, length, m, dss)\n     ...: %timeit vectorized_app1(pnp_array, length, m, dss)\n     ...: %timeit vectorized_app2(pnp_array, length, m, dss)\n     ...: \n1 loop, best of 3: 503 ms per loop\n1 loop, best of 3: 222 ms per loop\n10 loops, best of 3: 106 ms per loop\n
Run Code Online (Sandbox Code Playgroud)\n