如何在不指定周期的情况下分解数据中存在的多个周期性?

Eni*_*mAI 7 python signal-processing numpy fft autocorrelation

我试图将信号中存在的周期性分解为其各个组成部分,以计算它们的时间周期。

假设以下是我的示例信号:

在此输入图像描述

您可以使用以下代码重现信号:

t_week = np.linspace(1,480, 480)
t_weekend=np.linspace(1,192,192)
T=96 #Time Period
x_weekday = 10*np.sin(2*np.pi*t_week/T)+10
x_weekend = 2*np.sin(2*np.pi*t_weekend/T)+10
x_daily_weekly_sinu = np.concatenate((x_weekday, x_weekend)) 

#Creating the Signal
x_daily_weekly_long_sinu = np.concatenate((x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu))

#Visualization
plt.plot(x_daily_weekly_long_sinu)
plt.show()
Run Code Online (Sandbox Code Playgroud)

我的目标是将这个信号分成 3 个独立的隔离分量信号,其中包括:

  1. 天数作为期间
  2. 工作日作为期间
  3. 周末作为期间

期间如下图:

在此输入图像描述

我尝试使用statsmodel中的STL分解方法:

sm.tsa.seasonal_decompose()
Run Code Online (Sandbox Code Playgroud)

但这仅适用于您事先知道周期的情况。并且仅适用于一次分解一个周期。同时,我需要分解任何具有多个周期性且其周期事先未知的信号。

谁能帮助如何实现这一目标?

sme*_*rkd 3

您是否尝试过更多算法方法?我们可以首先尝试识别信号的变化,无论是幅度还是频率。使用一些 epsilon 识别存在重大变化的所有阈值点,然后对该窗口进行 FFT。

这是我的方法:

  1. 我发现带有 Daubechies 小波的 DWT 非常擅长这一点。当任何一个发生变化时,转换时都会出现明显的峰值,这使得识别窗口非常好。
  2. 进行了高斯混合,本质上是键入 2 个特定的窗口大小。在您的示例中,这是固定的,但对于真实数据,它可能不是。
  3. 环回应用 FFT 的峰值对并发现突出的频率。
  4. 现在您有了窗口的宽度,您可以使用它来从高斯混合与另一个 epsilon 中进行识别,以计算出窗口之间的周期,并使用 FFT 来获得该窗口内的突出频率。不过,如果我是你,我会使用混合模型来识别窗口中的关键频率或幅度。如果我们可以假设现实世界中的频率/幅度呈正态分布。

请注意,有很多方法可以解决这个问题。我想说,就个人而言,从小波变换开始是一个好的开始。

这是代码,尝试添加一些高斯噪声或其他可变性来测试它。您会发现噪声越多,DWT 的最小 epsilon 就需要越高,因此您确实需要对其进行一些调整。

import pywt
from sklearn.mixture import GaussianMixture
data = x_daily_weekly_long_sinu
times = np.linspace(0, len(data), len(data))
SAMPLING_RATE = len(data) / len(times)  # needed for frequency calc (number of discrete times / time interval) in this case it's 1
cA, cD = pywt.dwt(data, 'db4', mode='periodization')  # Daubechies wavelet good for changes in freq

# find peaks, with db4 good indicator of changes in frequencies, greater than some arbitrary value (you'll have to find by possibly plotting plt.plot(cD))
EPSILON = 0.02
peaks = (np.where(np.abs(cD) > EPSILON)[0] * 2)  # since cD (detailed coef) is len(x) / 2 only true for periodization mode
peaks = [0] + peaks.tolist() + [len(data) -1 ]  # always add start and end as beginning of windows

# iterate through peak pairs
if len(peaks) < 2:
    print('No peaks found...')
    exit(0)

# iterate through the "paired" windows
MIN_WINDOW_WIDTH = 10   # min width for the start of a new window
peak_starts = []
for i in range(len(peaks) - 1):
    s_ind, e_ind = peaks[i], peaks[i + 1]
    if len(peak_starts) > 0 and (s_ind - peak_starts[-1]) < MIN_WINDOW_WIDTH:
        continue  # not wide enough
    peak_starts.append(s_ind)

# calculate the sequential differences between windows
# essentially giving us how wide they are
seq_dist = np.array([t - s for s, t in zip(peak_starts, peak_starts[1:])])

# since peak windows might not be exact in the real world let's make a gaussian mixture
# you're assuming how many different windows there are here)
# with this assumption we're going to assume 2 different kinds of windows
WINDOW_NUM = 2
gmm = GaussianMixture(WINDOW_NUM).fit(seq_dist.reshape(-1, 1))
window_widths = [float(m) for m in gmm.means_]

# for example we would assume this prints (using your example of 2 different window types)
weekday_width, weekend_width = list(sorted(window_widths))
print('Weekday Width, Weekend Width', weekday_width, weekend_width)  # prints 191.9 and 479.59

# now we can process peak pairs with their respective windows
# we specify a padding which essentially will remove edge data which might overlap with another window (we really only care about the frequency)
freq_data = {}
PADDING = 3  # add padding to remove edge elements
WIDTH_EPSILON = 5  # make sure the window found is within the width found in gaussian mixture (to remove other small/large windows with noise)
T2_data = []
T3_data = []
for s, t in zip(peak_starts, peak_starts[1:]):
    width = t - s
    passed = False
    for testw in window_widths:
        if abs(testw - width) < WIDTH_EPSILON:
            passed = True
            break
    
    # weird window ignore it
    if not passed:
        continue

    # for your example let's populate T2 data
    if (width - weekday_width) < WIDTH_EPSILON:
        T2_data.append(s)  # append start
    elif (width - weekend_width) < WIDTH_EPSILON:
        T3_data.append(s)

    # append main frequency in window
    window = data[s + PADDING: t - PADDING]

    # get domininant frequency
    fft = np.real(np.fft.fft(window))
    fftfreq = np.fft.fftfreq(len(window))
    freq = SAMPLING_RATE * fftfreq[np.argmax(np.abs(fft[1:])) + 1]  # ignore constant (shifting) freq 0
    freq_data[int(testw)] = np.abs(freq)

print('T2 = ', np.mean([t - s for s, t in zip(T2_data, T2_data[1:])]))
print('T3 = ', np.mean([t - s for s, t in zip(T3_data, T3_data[1:])]))
print('Frequency data', freq_data)

# convert to periods
period_data = {}
for w in freq_data.keys():
    period_data[w] = 1.0 / freq_data[w]

print('Period data', period_data)
Run Code Online (Sandbox Code Playgroud)

使用打印以下内容的示例(注意结果并不准确)。

Weekday Width, Weekend Width 191.99999999999997 479.5999999999999
T2 =  672.0
T3 =  671.5555555555555
Frequency data {479: 0.010548523206751054, 191: 0.010752688172043012}
Period data {479: 94.8, 191: 92.99999999999999}
Run Code Online (Sandbox Code Playgroud)

请注意,这就是 DWT 系数的样子。 DWT系数