在 Python 中去除信号中的尖峰

wie*_*nYo 10 signal-processing pandas

例如,我有一个来自呼吸记录的信号,其中有很多由于打哈欠而产生的尖峰。我尝试使用 pandas 的滚动平均函数删除它,但没有帮助。该图上的绿色空间是使用滚动平均值的结果。

import pandas as pd

RESP=pd.DataFrame(RESP)
RESP_AV=pd.rolling_mean(RESP,50)
Run Code Online (Sandbox Code Playgroud)

我对过滤数据不太了解,而且我在 pandas 中找不到任何其他方法来消除这个尖峰,所以我的问题是在哪里寻找答案。RESP.head() 的结果是:

 0 -2562.863389
1 -2035.020403
2 -2425.538355
3 -2554.280563
4 -2242.438367
6.7636961937
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

Opp*_*ppy 10

这是从数据中消除尖峰的通用方法。代码位于本文末尾。需要针对数据调整的变量是大写的。

示例数据集是具有随机尖峰的正弦波。我将噪声数据集称为y_spikey

输入数据集

  1. 剪辑数据 - 用 np.nan 替换 HIGH_CUT 以上和 LOW_CUT 以下的数据。我将此数据集称为y_clipped

  2. 计算剪裁数据的前向-后向指数加权移动平均值 (FBEWMA)。我将此数据集称为y_ewma_fb。上一步裁剪数据有助于将该曲线拟合到剩余数据。变量 SPAN 调整平均窗口的长度,并且应该根据您的数据进行调整。这里有FBEWMA的解释:Exponential Smoothing Average

y_clipped 和 y_ewma_fb

  1. 将 FBEWMA 数据中的 DELTA 剪辑数据替换为 np.nan。我将此数据集称为y_remove_outliers

y_remove_outliers 和 y_ewma_fb

  1. 使用 pd.interpolate() 对 y_remove_outliers 中的缺失值进行插值。我将此数据集称为y_interpolated。这是您的输出数据集。

y_spikey 和 y_interpolated

代码:

import logging
import numpy as np
import pandas as pd

logging.basicConfig(datefmt='%H:%M:%S',
                    stream=sys.stdout, level=logging.DEBUG,
                    format='%(asctime)s %(message)s')

# Distance away from the FBEWMA that data should be removed.
DELTA = 0.1

# clip data above this value:
HIGH_CLIP = 2.1

# clip data below this value:
LOW_CLIP = -2.1

# random values above this trigger a spike:
RAND_HIGH = 0.98

# random values below this trigger a negative spike:
RAND_LOW = 0.02

# How many samples to run the FBEWMA over.
SPAN = 10

# spike amplitude
SPIKE = 2


def clip_data(unclipped, high_clip, low_clip):
    ''' Clip unclipped between high_clip and low_clip. 
    unclipped contains a single column of unclipped data.'''
    
    # convert to np.array to access the np.where method
    np_unclipped = np.array(unclipped)
    # clip data above HIGH_CLIP or below LOW_CLIP
    cond_high_clip = (np_unclipped > HIGH_CLIP) | (np_unclipped < LOW_CLIP)
    np_clipped = np.where(cond_high_clip, np.nan, np_unclipped)
    return np_clipped.tolist()


def create_sample_data():
    ''' Create sine wave, amplitude +/-2 with random spikes. '''
    x = np.linspace(0, 2*np.pi, 1000)
    y = 2 * np.sin(x)
    df = pd.DataFrame(list(zip(x,y)), columns=['x', 'y'])
    df['rand'] = np.random.random_sample(len(x),)
    # create random positive and negative spikes
    cond_spike_high = (df['rand'] > RAND_HIGH)
    df['spike_high'] = np.where(cond_spike_high, SPIKE, 0)
    cond_spike_low = (df['rand'] < RAND_LOW)
    df['spike_low'] = np.where(cond_spike_low, -SPIKE, 0)
    df['y_spikey'] = df['y'] + df['spike_high'] + df['spike_low']
    return df


def ewma_fb(df_column, span):
    ''' Apply forwards, backwards exponential weighted moving average (EWMA) to df_column. '''
    # Forwards EWMA.
    fwd = pd.Series.ewm(df_column, span=span).mean()
    # Backwards EWMA.
    bwd = pd.Series.ewm(df_column[::-1],span=10).mean()
    # Add and take the mean of the forwards and backwards EWMA.
    stacked_ewma = np.vstack(( fwd, bwd[::-1] ))
    fb_ewma = np.mean(stacked_ewma, axis=0)
    return fb_ewma
    
    
def remove_outliers(spikey, fbewma, delta):
    ''' Remove data from df_spikey that is > delta from fbewma. '''
    np_spikey = np.array(spikey)
    np_fbewma = np.array(fbewma)
    cond_delta = (np.abs(np_spikey-np_fbewma) > delta)
    np_remove_outliers = np.where(cond_delta, np.nan, np_spikey)
    return np_remove_outliers

    
def main():
    df = create_sample_data()

    df['y_clipped'] = clip_data(df['y_spikey'].tolist(), HIGH_CLIP, LOW_CLIP)
    df['y_ewma_fb'] = ewma_fb(df['y_clipped'], SPAN)
    df['y_remove_outliers'] = remove_outliers(df['y_clipped'].tolist(), df['y_ewma_fb'].tolist(), DELTA)
    df['y_interpolated'] = df['y_remove_outliers'].interpolate()
    
    ax = df.plot(x='x', y='y_spikey', color='blue', alpha=0.5)
    ax2 = df.plot(x='x', y='y_interpolated', color='black', ax=ax)
    
main()

Run Code Online (Sandbox Code Playgroud)