如何在Python中对这个峰值发现进行矢量化?

den*_*505 6 python signal-processing numpy vectorization scipy

基本上我正在编写一个峰值发现功能,需要能够scipy.argrelextrema在基准测试中击败.这是我正在使用的数据的链接,以及代码:

https://drive.google.com/open?id=1U-_xQRWPoyUXhQUhFgnM3ByGw-1VImKB

如果此链接过期,则可以在dukascopy银行的在线历史数据下载程序中找到该数据.

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

data = pd.read_csv('EUR_USD.csv')
data.columns = ['Date', 'open', 'high', 'low', 'close','volume']

data.Date = pd.to_datetime(data.Date, format='%d.%m.%Y %H:%M:%S.%f')

data = data.set_index(data.Date)

data = data[['open', 'high', 'low', 'close']]

data = data.drop_duplicates(keep=False)

price = data.close.values

def fft_detect(price, p=0.4):

    trans = np.fft.rfft(price)
    trans[round(p*len(trans)):] = 0
    inv = np.fft.irfft(trans)
    dy = np.gradient(inv)
    peaks_idx = np.where(np.diff(np.sign(dy)) == -2)[0] + 1
    valleys_idx = np.where(np.diff(np.sign(dy)) == 2)[0] + 1

    patt_idx = list(peaks_idx) + list(valleys_idx)
    patt_idx.sort()

    label = [x for x in np.diff(np.sign(dy)) if x != 0]

    # Look for Better Peaks

    l = 2

    new_inds = []

    for i in range(0,len(patt_idx[:-1])):

        search = np.arange(patt_idx[i]-(l+1),patt_idx[i]+(l+1))

        if label[i] == -2:
            idx = price[search].argmax()
        elif label[i] == 2:
            idx = price[search].argmin()

        new_max = search[idx]
        new_inds.append(new_max)

    plt.plot(price)
    plt.plot(inv)
    plt.scatter(patt_idx,price[patt_idx])
    plt.scatter(new_inds,price[new_inds],c='g')
    plt.show()

    return peaks_idx, price[peaks_idx]
Run Code Online (Sandbox Code Playgroud)

它基本上使用快速傅里叶变换(FFT)对数据进行平滑,然后使用导数找到平滑数据的最小和最大索引,然后在未平滑数据上找到相应的峰值.有时它找到的峰值由于某些平滑效应而不是主意,因此我运行此for循环来搜索指定范围之间的每个索引的更高或更低的点l.我需要帮助矢量化这个for循环!我不知道该怎么做.没有for循环,我的代码比快50%快scipy.argrelextrema,但for循环减慢了它.因此,如果我能找到一种方法来对其进行矢量化,那么它将是一种非常快速且非常有效的替代方案scipy.argrelextrema.这两个图像分别表示没有for循环和循环的数据.

在添加'for'循环之前,峰值并不理想 使用'for'循环,峰值要好得多

mor*_*rty 2

这是一个替代方案......它使用列表理解,通常比 for 循环更快

l = 2

# Define the bounds beforehand, its marginally faster than doing it in the loop
upper = np.array(patt_idx) + l + 1
lower = np.array(patt_idx) - l - 1

# List comprehension...
new_inds = [price[low:hi].argmax() + low if lab == -2 else 
            price[low:hi].argmin() + low 
            for low, hi, lab in zip(lower, upper, label)]

# Find maximum within each interval
new_max = price[new_inds]
new_global_max = np.max(new_max)
Run Code Online (Sandbox Code Playgroud)