jon*_*ser 5 python signal-processing machine-learning curve-fitting scipy
我有一个相当奇怪的问题。我目前正在处理时间序列数据,并且数据集中有几个峰值。该数据是使用中子密度测井机收集的,它描述了传感器在一段时间内连续记录的事件。数据中的峰值对应于该机器沿钻孔下行时的一些有趣的间隔。所以说,巅峰很重要。然而,重要的不仅仅是峰值。这是整个间隔(或者至少按照我的描述;参见我的附图)。现在我的问题是,是否有一种我可以使用或研究的信号处理(最好是Python)方法,可以让我将此信号分为不同的区间,每个区间对应于局部最小值/最大值?
我最初的方法是使用 Kleinberg 2002 所描述的突发检测算法,但我没有成功,所以我想征求其他人对此的意见。
这是原始数据:

这就是我想做的:

由于原始数据不可用,我生成了一些测试数据。方法如下。如果数据是一组具有跳跃的平台,则检查移动窗口上的标准偏差将给出跳跃处的峰值。用阈值隔离峰值。通过查看应用移动平均线后的数据来估计跳跃。这为安装提供了良好的起点。作为适合功能,我再次使用我最喜欢的tanh.
结果如下:
import matplotlib.pyplot as plt
import numpy as np
from operator import itemgetter
from itertools import groupby
from scipy.optimize import leastsq
def moving_average( data, windowsize, **kwargs ):
mode=kwargs.pop('mode', 'valid')
weight = np.ones( windowsize )/ float( windowsize )
return np.convolve( data, weight, mode=mode)
def moving_sigma( data, windowsize ):
l = len( data )
sout = list()
for i in range( l - windowsize + 1 ):
sout += [ np.std( data[ i : i + windowsize ] ) ]
return np.array( sout )
def m_step( x, s, x0List, ampList, off ):
assert len( x0List ) == len( ampList )
out = [ a * 0.5 * ( 1 + np.tanh( s * (x - x0) ) ) for a, x0 in zip( ampList, x0List )]
return sum( out ) + off
def residuals( params, xdata, ydata ):
assert not len(params) % 2
off = params[0]
s = params[1]
rest = params[2:]
n = len( rest )
x0 = rest[:n/2]
am = rest[n/2:]
diff = [ y - m_step( x,s, x0, am, off ) for x, y in zip( xdata, ydata ) ]
return diff
### generate data
np.random.seed(776)
a=0
data = list()
for i in range(15):
a = 50 * ( 1 - 2 * np.random.random() )
for _ in range ( np.random.randint( 5, 35 ) ):
data += [a]
xx = np.arange( len( data ), dtype=np.float )
noisydata = list()
for x in data:
noisydata += [ x + np.random.normal( scale=5 ) ]
###define problem specific parameters
myWindow = 10
thresh = 8
cutoffscale = 1.1
### data treatment
avdata = moving_average( noisydata, myWindow )
avx = moving_average( xx, myWindow )
sdata = moving_sigma( noisydata, myWindow )
### getting start values for the fit
seq = [x for x in np.where( sdata > thresh )[0] ]
# from /sf/answers/220464541/
jumps = [ map( itemgetter(1), g ) for k, g in groupby( enumerate( seq ), lambda ( i, x ) : i - x ) ]
xjumps = [ [ avx[ k ] for k in xList ] for xList in jumps ]
means = [ np.mean( x ) for x in xjumps ]
### the delta is too small as one only looks above thresh so lets increase it by guessing
deltas = [ ( avdata[ x[-1] ] - avdata[ x[0] ] ) * cutoffscale for x in jumps ]
guess = [ avdata[0], 2] + means + deltas
### fitting
sol, err = leastsq( residuals, guess, args=( xx, noisydata ), maxfev=10000 )
fittedoff = sol[0]
fitteds = sol[1]
fittedx0 = sol[ 2 : 2 + len( means ) ]
fittedam = sol[ 2 + len( means ) : ]
### plotting
fig = plt.figure()
ax = fig.add_subplot( 1, 1, 1 )
# ~ ax.plot( data )
ax.plot( xx, noisydata, label='noisy data' )
ax.plot( avx, avdata, label='moving average' )
ax.plot( avx, sdata, label='window sigma' )
for j in means:
ax.axvline( j, c='#606060', linestyle=':' )
yy = [ m_step( x, 2, means, deltas, avdata[0] ) for x in xx]
bestyy = [ m_step( x, fitteds, fittedx0, fittedam, fittedoff ) for x in xx ]
ax.plot( xx, yy, label='guess' )
ax.plot( xx, bestyy, label='fit' )
plt.legend( loc=0 )
plt.show()
Run Code Online (Sandbox Code Playgroud)
给出了下图:
一些较小的跳跃未被检测到,但应该清楚这是预期的。而且,整个过程并不是很快。由于阈值的原因,x存在的跳跃越多,初始猜测就会变得越差。最后,没有检测到一系列小跳跃,因此数据中的所得斜率被拟合为具有明显大误差的大平台。人们可以为此引入一项额外的检查,并添加跳跃位置以适应相应的情况。
另请注意,窗口大小的选择决定了两次跳跃将被检测为单独跳跃的距离。