Eni*_*mAI 7 python signal-processing numpy fft autocorrelation
我试图将信号中存在的周期性分解为其各个组成部分,以计算它们的时间周期。
假设以下是我的示例信号:
您可以使用以下代码重现信号:
t_week = np.linspace(1,480, 480)
t_weekend=np.linspace(1,192,192)
T=96 #Time Period
x_weekday = 10*np.sin(2*np.pi*t_week/T)+10
x_weekend = 2*np.sin(2*np.pi*t_weekend/T)+10
x_daily_weekly_sinu = np.concatenate((x_weekday, x_weekend))
#Creating the Signal
x_daily_weekly_long_sinu = np.concatenate((x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu,x_daily_weekly_sinu))
#Visualization
plt.plot(x_daily_weekly_long_sinu)
plt.show()
Run Code Online (Sandbox Code Playgroud)
我的目标是将这个信号分成 3 个独立的隔离分量信号,其中包括:
期间如下图:
我尝试使用statsmodel中的STL分解方法:
sm.tsa.seasonal_decompose()
Run Code Online (Sandbox Code Playgroud)
但这仅适用于您事先知道周期的情况。并且仅适用于一次分解一个周期。同时,我需要分解任何具有多个周期性且其周期事先未知的信号。
谁能帮助如何实现这一目标?
您是否尝试过更多算法方法?我们可以首先尝试识别信号的变化,无论是幅度还是频率。使用一些 epsilon 识别存在重大变化的所有阈值点,然后对该窗口进行 FFT。
这是我的方法:
请注意,有很多方法可以解决这个问题。我想说,就个人而言,从小波变换开始是一个好的开始。
这是代码,尝试添加一些高斯噪声或其他可变性来测试它。您会发现噪声越多,DWT 的最小 epsilon 就需要越高,因此您确实需要对其进行一些调整。
import pywt
from sklearn.mixture import GaussianMixture
data = x_daily_weekly_long_sinu
times = np.linspace(0, len(data), len(data))
SAMPLING_RATE = len(data) / len(times) # needed for frequency calc (number of discrete times / time interval) in this case it's 1
cA, cD = pywt.dwt(data, 'db4', mode='periodization') # Daubechies wavelet good for changes in freq
# find peaks, with db4 good indicator of changes in frequencies, greater than some arbitrary value (you'll have to find by possibly plotting plt.plot(cD))
EPSILON = 0.02
peaks = (np.where(np.abs(cD) > EPSILON)[0] * 2) # since cD (detailed coef) is len(x) / 2 only true for periodization mode
peaks = [0] + peaks.tolist() + [len(data) -1 ] # always add start and end as beginning of windows
# iterate through peak pairs
if len(peaks) < 2:
print('No peaks found...')
exit(0)
# iterate through the "paired" windows
MIN_WINDOW_WIDTH = 10 # min width for the start of a new window
peak_starts = []
for i in range(len(peaks) - 1):
s_ind, e_ind = peaks[i], peaks[i + 1]
if len(peak_starts) > 0 and (s_ind - peak_starts[-1]) < MIN_WINDOW_WIDTH:
continue # not wide enough
peak_starts.append(s_ind)
# calculate the sequential differences between windows
# essentially giving us how wide they are
seq_dist = np.array([t - s for s, t in zip(peak_starts, peak_starts[1:])])
# since peak windows might not be exact in the real world let's make a gaussian mixture
# you're assuming how many different windows there are here)
# with this assumption we're going to assume 2 different kinds of windows
WINDOW_NUM = 2
gmm = GaussianMixture(WINDOW_NUM).fit(seq_dist.reshape(-1, 1))
window_widths = [float(m) for m in gmm.means_]
# for example we would assume this prints (using your example of 2 different window types)
weekday_width, weekend_width = list(sorted(window_widths))
print('Weekday Width, Weekend Width', weekday_width, weekend_width) # prints 191.9 and 479.59
# now we can process peak pairs with their respective windows
# we specify a padding which essentially will remove edge data which might overlap with another window (we really only care about the frequency)
freq_data = {}
PADDING = 3 # add padding to remove edge elements
WIDTH_EPSILON = 5 # make sure the window found is within the width found in gaussian mixture (to remove other small/large windows with noise)
T2_data = []
T3_data = []
for s, t in zip(peak_starts, peak_starts[1:]):
width = t - s
passed = False
for testw in window_widths:
if abs(testw - width) < WIDTH_EPSILON:
passed = True
break
# weird window ignore it
if not passed:
continue
# for your example let's populate T2 data
if (width - weekday_width) < WIDTH_EPSILON:
T2_data.append(s) # append start
elif (width - weekend_width) < WIDTH_EPSILON:
T3_data.append(s)
# append main frequency in window
window = data[s + PADDING: t - PADDING]
# get domininant frequency
fft = np.real(np.fft.fft(window))
fftfreq = np.fft.fftfreq(len(window))
freq = SAMPLING_RATE * fftfreq[np.argmax(np.abs(fft[1:])) + 1] # ignore constant (shifting) freq 0
freq_data[int(testw)] = np.abs(freq)
print('T2 = ', np.mean([t - s for s, t in zip(T2_data, T2_data[1:])]))
print('T3 = ', np.mean([t - s for s, t in zip(T3_data, T3_data[1:])]))
print('Frequency data', freq_data)
# convert to periods
period_data = {}
for w in freq_data.keys():
period_data[w] = 1.0 / freq_data[w]
print('Period data', period_data)
Run Code Online (Sandbox Code Playgroud)
使用打印以下内容的示例(注意结果并不准确)。
Weekday Width, Weekend Width 191.99999999999997 479.5999999999999
T2 = 672.0
T3 = 671.5555555555555
Frequency data {479: 0.010548523206751054, 191: 0.010752688172043012}
Period data {479: 94.8, 191: 92.99999999999999}
Run Code Online (Sandbox Code Playgroud)