8on*_*ne6 20 python interpolation numpy scipy
假设我有一个山上3个(已知)高度的气象站的数据.具体而言,每个站每分钟记录一次温度测量.我有两种插值我想要执行.而且我希望能够快速完成每一项工作.
所以让我们设置一些数据:
import numpy as np
from scipy.interpolate import interp1d
import pandas as pd
import seaborn as sns
np.random.seed(0)
N, sigma = 1000., 5
basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
alltemps = np.array([basetemps, midtemps, toptemps]).T # note transpose!
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]
altitudes = np.array([500, 1500, 4000]).astype(float)
finaltemps = pd.DataFrame(alltemps + trend, columns=altitudes)
finaltemps.index.names, finaltemps.columns.names = ['Time'], ['Altitude']
finaltemps.plot()
Run Code Online (Sandbox Code Playgroud)
我认为这个非常简单.假设我想每次将温度控制在1000海拔高度.我可以使用内置的scipy插值方法:
interping_function = interp1d(altitudes, finaltemps.values)
interped_to_1000 = interping_function(1000)
fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_to_1000, label='Interped')
ax.legend(loc='best', title=finaltemps.columns.name)
Run Code Online (Sandbox Code Playgroud)
这很好用.让我们来看看速度:
%%timeit
res = interp1d(altitudes, finaltemps.values)(1000)
#-> 1000 loops, best of 3: 207 µs per loop
Run Code Online (Sandbox Code Playgroud)
所以现在我有第二个相关的问题.假设我知道徒步旅行的高度是时间的函数,我想通过随时间线性插入我的数据来计算他们(移动)位置的温度. 特别是,我知道远足队的位置的时间与我知道气象站的温度相同. 我可以毫不费力地做到这一点:
location = np.linspace(altitudes[0], altitudes[-1], N)
interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc)
for i, loc in enumerate(location)])
fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_along_path, label='Interped')
ax.legend(loc='best', title=finaltemps.columns.name)
Run Code Online (Sandbox Code Playgroud)
所以这很好用,但重要的是要注意上面的关键是使用列表理解来隐藏大量的工作.在前一种情况下,scipy为我们创建单个插值函数,并在大量数据上对其进行一次评估.在这种情况下,scipy实际上是构建N单独的插值函数并在少量数据上评估每一次.这感觉本身效率低下.这里潜伏着一个for循环(在列表理解中)而且,这只是感觉松弛.
毫不奇怪,这比前一种情况要慢得多:
%%timeit
res = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc)
for i, loc in enumerate(location)])
#-> 10 loops, best of 3: 145 ms per loop
Run Code Online (Sandbox Code Playgroud)
所以第二个例子比第一个例子慢1000.也就是说,重物提升是"制作线性插值函数"的步骤......在第二个例子中发生了1000次,但在第一个例子中只发生了一次.
所以,问题是:有没有更好的方法来解决第二个问题? 例如,有没有一种很好的方法来设置2维插值(这可能会处理远程方位置已知的时间不是采样温度的时间)?还是有一种特别灵巧的方式来处理时间排队的事情?或其他?
小智 11
两个值之间y1,y2位置x1和x2相对于点的线性插值xi简单地说是:
yi = y1 + (y2-y1) * (xi-x1) / (x2-x1)
Run Code Online (Sandbox Code Playgroud)
使用一些矢量化Numpy表达式,我们可以从数据集中选择相关点并应用上述函数:
I = np.searchsorted(altitudes, location)
x1 = altitudes[I-1]
x2 = altitudes[I]
time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]
xI = location
yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
Run Code Online (Sandbox Code Playgroud)
麻烦的是,有些点位于已知范围的边界(甚至在已知范围之外),应该考虑到这一点:
I = np.searchsorted(altitudes, location)
same = (location == altitudes.take(I, mode='clip'))
out_of_range = ~same & ((I == 0) | (I == altitudes.size))
I[out_of_range] = 1 # Prevent index-errors
x1 = altitudes[I-1]
x2 = altitudes[I]
time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]
xI = location
yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
yI[out_of_range] = np.nan
Run Code Online (Sandbox Code Playgroud)
幸运的是,Scipy已经提供了ND插值,这也很容易处理不匹配的时间,例如:
from scipy.interpolate import interpn
time = np.arange(len(alltemps))
M = 150
hiketime = np.linspace(time[0], time[-1], M)
location = np.linspace(altitudes[0], altitudes[-1], M)
xI = np.column_stack((hiketime, location))
yI = interpn((time, altitudes), alltemps, xI)
Run Code Online (Sandbox Code Playgroud)
这是一个基准代码(没有任何pandas实际情况,我确实包含了其他答案的解决方案):
import numpy as np
from scipy.interpolate import interp1d, interpn
def original():
return np.array([interp1d(altitudes, alltemps[i, :])(loc)
for i, loc in enumerate(location)])
def OP_self_answer():
return np.diagonal(interp1d(altitudes, alltemps)(location))
def interp_checked():
I = np.searchsorted(altitudes, location)
same = (location == altitudes.take(I, mode='clip'))
out_of_range = ~same & ((I == 0) | (I == altitudes.size))
I[out_of_range] = 1 # Prevent index-errors
x1 = altitudes[I-1]
x2 = altitudes[I]
time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]
xI = location
yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
yI[out_of_range] = np.nan
return yI
def scipy_interpn():
time = np.arange(len(alltemps))
xI = np.column_stack((time, location))
yI = interpn((time, altitudes), alltemps, xI)
return yI
N, sigma = 1000., 5
basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]
alltemps = np.array([basetemps, midtemps, toptemps]).T + trend
altitudes = np.array([500, 1500, 4000], dtype=float)
location = np.linspace(altitudes[0], altitudes[-1], N)
funcs = [original, interp_checked, scipy_interpn]
for func in funcs:
print(func.func_name)
%timeit func()
from itertools import combinations
outs = [func() for func in funcs]
print('Output allclose:')
print([np.allclose(out1, out2) for out1, out2 in combinations(outs, 2)])
Run Code Online (Sandbox Code Playgroud)
在我的系统上有以下结果:
original
10 loops, best of 3: 184 ms per loop
OP_self_answer
10 loops, best of 3: 89.3 ms per loop
interp_checked
1000 loops, best of 3: 224 µs per loop
scipy_interpn
1000 loops, best of 3: 1.36 ms per loop
Output allclose:
[True, True, True, True, True, True]
Run Code Online (Sandbox Code Playgroud)
interpn与最快的方法相比,Scipy 在速度方面受到一定程度的影响,但由于它的通用性和易用性,它绝对是最佳选择.
对于固定时间点,您可以使用以下插值函数:
g(a) = cc[0]*abs(a-aa[0]) + cc[1]*abs(a-aa[1]) + cc[2]*abs(a-aa[2])
Run Code Online (Sandbox Code Playgroud)
其中a是徒步旅行者的高度,aa与所述3测量的向量altitudes和cc是与所述系数的矢量.有三点需要注意:
alltemps)aa,cc可以通过求解线性矩阵方程来确定np.linalg.solve().g(a)对于(N,)维a和(N,3)维cc(np.linalg.solve()分别包括),易于矢量化.g(a)被称为一阶单变量样条核(三点).使用abs(a-aa[i])**(2*d-1)会将样条线顺序更改为d.这种方法可以解释为机器学习中高斯过程的简化版本.所以代码是:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
# generate temperatures
np.random.seed(0)
N, sigma = 1000, 5
trend = np.sin(4 / N * np.arange(N)) * 30
alltemps = np.array([tmp0 + trend + sigma*np.random.randn(N)
for tmp0 in [70, 50, 40]])
# generate attitudes:
altitudes = np.array([500, 1500, 4000]).astype(float)
location = np.linspace(altitudes[0], altitudes[-1], N)
def doit():
""" do the interpolation, improved version for speed """
AA = np.vstack([np.abs(altitudes-a_i) for a_i in altitudes])
# This is slighty faster than np.linalg.solve(), because AA is small:
cc = np.dot(np.linalg.inv(AA), alltemps)
return (cc[0]*np.abs(location-altitudes[0]) +
cc[1]*np.abs(location-altitudes[1]) +
cc[2]*np.abs(location-altitudes[2]))
t_loc = doit() # call interpolator
# do the plotting:
fg, ax = plt.subplots(num=1)
for alt, t in zip(altitudes, alltemps):
ax.plot(t, label="%d feet" % alt, alpha=.5)
ax.plot(t_loc, label="Interpolation")
ax.legend(loc="best", title="Altitude:")
ax.set_xlabel("Time")
ax.set_ylabel("Temperature")
fg.canvas.draw()
Run Code Online (Sandbox Code Playgroud)
测量时间给出:
In [2]: %timeit doit()
10000 loops, best of 3: 107 µs per loop
Run Code Online (Sandbox Code Playgroud)
更新:我将原始列表推导替换doit()
为导入速度30%(For N=1000).
此外,根据要求进行比较,@ moarningsun在我的机器上的基准代码块:
10 loops, best of 3: 110 ms per loop
interp_checked
10000 loops, best of 3: 83.9 µs per loop
scipy_interpn
1000 loops, best of 3: 678 µs per loop
Output allclose:
[True, True, True]
Run Code Online (Sandbox Code Playgroud)
请注意,这N=1000是一个相对较小的数字.使用N=100000产生结果:
interp_checked
100 loops, best of 3: 8.37 ms per loop
%timeit doit()
100 loops, best of 3: 5.31 ms per loop
Run Code Online (Sandbox Code Playgroud)
这表明这种方法N比interp_checked方法更好地扩展.