Rog*_*ger 10 python validation time-series scikit-learn
我希望对我的时间序列数据执行前向验证。存在关于如何执行滚动窗口的大量文档:
或扩展窗口
但此验证与我的生产系统中的情况并不对应:我想每天重新训练一个模型,该模型将在未来 14 天进行预测。因此,我只会在之前的训练周期中添加一天的数据(其他方法在接下来的训练中添加的数据会折叠一整组长度为 的数据test_size
;在我的例子中为 14 天)。因此,我想用滑动窗口验证我的模型:
我的问题是我找不到可以完成这项工作的Python 库。sklearn 的TimeSeriesSplit没有此类选项。基本上我想提供 :
test_size
, n_fold
,min_train_size
和
if n_fold > (n_samples - min_train_size) % test_size
then 接下来training_set
从上一个折叠中提取数据test_set
看来您的要求是使测试尺寸大于 1 倍。要进行更改,您需要调整这些行。
我已经进行了这些更改并添加了一个名为 的新参数n_test_folds
,以便可以对其进行自定义。
from sklearn.model_selection._split import TimeSeriesSplit
from sklearn.utils.validation import _deprecate_positional_args
from sklearn.utils import indexable
from sklearn.utils.validation import _num_samples
class WindowedTestTimeSeriesSplit(TimeSeriesSplit):
"""
parameters
----------
n_test_folds: int
number of folds to be used as testing at each iteration.
by default, 1.
"""
@_deprecate_positional_args
def __init__(self, n_splits=5, *, max_train_size=None, n_test_folds=1):
super().__init__(n_splits,
max_train_size=max_train_size)
self.n_test_folds=n_test_folds
def split(self, X, y=None, groups=None):
"""Generate indices to split data into training and test set.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data, where n_samples is the number of samples
and n_features is the number of features.
y : array-like of shape (n_samples,)
Always ignored, exists for compatibility.
groups : array-like of shape (n_samples,)
Always ignored, exists for compatibility.
Yields
------
train : ndarray
The training set indices for that split.
test : ndarray
The testing set indices for that split.
"""
X, y, groups = indexable(X, y, groups)
n_samples = _num_samples(X)
n_splits = self.n_splits
n_folds = n_splits + self.n_test_folds
if n_folds > n_samples:
raise ValueError(
("Cannot have number of folds ={0} greater"
" than the number of samples: {1}.").format(n_folds,
n_samples))
indices = np.arange(n_samples)
fold_size = (n_samples // n_folds)
test_size = fold_size * self.n_test_folds # test window
test_starts = range(fold_size + n_samples % n_folds,
n_samples-test_size+1, fold_size) # splits based on fold_size instead of test_size
for test_start in test_starts:
if self.max_train_size and self.max_train_size < test_start:
yield (indices[test_start - self.max_train_size:test_start],
indices[test_start:test_start + test_size])
else:
yield (indices[:test_start],
indices[test_start:test_start + test_size])
Run Code Online (Sandbox Code Playgroud)
例子:
import numpy as np
X = np.array([[1, 2], [3, 4], [1, 2], [3, 4], [1, 2], [3, 4]])
y = np.array([1, 2, 3, 4, 5, 6])
tscv = WindowedTestTimeSeriesSplit(n_splits=4, n_test_folds=2)
print(tscv)
for train_index, test_index in tscv.split(X):
print("TRAIN:", train_index, "TEST:", test_index)
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
# WindowedTestTimeSeriesSplit(max_train_size=None, n_splits=4, n_test_folds=2)
# TRAIN: [0] TEST: [1 2]
# TRAIN: [0 1] TEST: [2 3]
# TRAIN: [0 1 2] TEST: [3 4]
# TRAIN: [0 1 2 3] TEST: [4 5]
Run Code Online (Sandbox Code Playgroud)
注意: TRAIN: [0 1 2 3 4] TEST: [5] 未生成,因为它不满足测试折叠次数的要求。
使用这个函数,我们可以可视化简历的不同部分。
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
np.random.seed(1338)
cmap_data = plt.cm.Paired
cmap_cv = plt.cm.coolwarm
n_splits = 4
# Generate the class/group data
n_points = 100
X = np.random.randn(100, 10)
percentiles_classes = [.1, .3, .6]
y = np.hstack([[ii] * int(100 * perc)
for ii, perc in enumerate(percentiles_classes)])
# Evenly spaced groups repeated once
groups = np.hstack([[ii] * 10 for ii in range(10)])
fig, ax = plt.subplots()
cv = WindowedTestTimeSeriesSplit(n_splits=n_splits, n_test_folds=2)
plot_cv_indices(cv, X, y, groups, ax, n_splits)
plt.show()
Run Code Online (Sandbox Code Playgroud)
这是我的解决方案,允许用户指定测试范围和训练的最小数据样本:
from sklearn.model_selection import TimeSeriesSplit
from sklearn.utils import indexable
from sklearn.utils.validation import _num_samples
class TimeSeriesSplitCustom(TimeSeriesSplit):
def __init__(self, n_splits=5, max_train_size=None,
test_size=1,
min_train_size=1):
super().__init__(n_splits=n_splits, max_train_size=max_train_size)
self.test_size = test_size
self.min_train_size = min_train_size
def overlapping_split(self, X, y=None, groups=None):
min_train_size = self.min_train_size
test_size = self.test_size
n_splits = self.n_splits
n_samples = _num_samples(X)
if (n_samples - min_train_size) / test_size >= n_splits:
print('(n_samples - min_train_size) / test_size >= n_splits')
print('default TimeSeriesSplit.split() used')
yield from super().split(X)
else:
shift = int(np.floor(
(n_samples - test_size - min_train_size) / (n_splits - 1)))
start_test = n_samples - (n_splits * shift + test_size - shift)
test_starts = range(start_test, n_samples - test_size + 1, shift)
if start_test < min_train_size:
raise ValueError(
("The start of the testing : {0} is smaller"
" than the minimum training samples: {1}.").format(start_test,
min_train_size))
indices = np.arange(n_samples)
for test_start in test_starts:
if self.max_train_size and self.max_train_size < test_start:
yield (indices[test_start - self.max_train_size:test_start],
indices[test_start:test_start + test_size])
else:
yield (indices[:test_start],
indices[test_start:test_start + test_size])
Run Code Online (Sandbox Code Playgroud)
并通过可视化:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from ModelEvaluation import TimeSeriesSplitCustom
np.random.seed(1338)
cmap_data = plt.cm.Paired
cmap_cv = plt.cm.coolwarm
n_splits = 13
# Generate the class/group data
n_points = 100
X = np.random.randn(100, 10)
percentiles_classes = [.1, .3, .6]
y = np.hstack([[ii] * int(100 * perc)
for ii, perc in enumerate(percentiles_classes)])
# Evenly spaced groups repeated once
groups = np.hstack([[ii] * 10 for ii in range(10)])
fig, ax = plt.subplots()
cv = TimeSeriesSplitCustom(n_splits=n_splits, test_size=20, min_train_size=12)
plot_cv_indices(cv, X, y, groups, ax, n_splits)
plt.show()
Run Code Online (Sandbox Code Playgroud)
(要获得相同的结果,请确保更改
for ii, (tr, tt) in enumerate(**cv.overlapping_split**(X=X, y=y, groups=group)):
函数中的plot_cv_indices
。
干杯!
归档时间: |
|
查看次数: |
11790 次 |
最近记录: |