使用 Pyspark 的交叉验证指标

And*_*ado 5 cross-validation apache-spark pyspark apache-spark-mllib

当我们进行 k 折交叉验证时,我们正在测试模型在预测它从未见过的数据时的表现。

如果将我的数据集分成 90% 的训练和 10% 的测试并分析模型性能,则不能保证我的测试集不只包含 10% 的“最容易”或“最难”的预测点。

通过进行 10 折交叉验证,我可以确保每个点至少会用于训练一次。由于(在这种情况下)模型将被测试 10 次,我们可以对这些测试指标进行分析,这将使我们更好地了解模型在分类新数据方面的表现。

当目的应该是模型检查时,Spark 文档将交叉验证称为优化算法超参数的一种方式。

通过做这个:

lr = LogisticRegression(maxIter=10, tol=1E-4)
ovr = OneVsRest(classifier=lr)
pipeline = Pipeline(stages=[... , ovr])

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=10)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)
Run Code Online (Sandbox Code Playgroud)

我能够获得(在我的理解中)具有paramGrid 中定义的最佳参数集的模型。我理解这种超参数调整的价值,但我想要的是分析模型性能,而不仅仅是获得最佳模型。

问题是(对于这种情况下的 10 倍交叉验证):

是否可以使用 CrossValidator 为 10 个测试中的每一个(或每个指标的这 10 个测试的平均值)提取指标(f1、精度、召回率等)?,. 是否可以使用 CrossValidator 进行模型检查而不是模型选择?

谢谢!


更新


正如user10465355在评论中所述,可以在此处找到类似的问题。第一个建议是在拟合之前将 collectSubModels 设置为 true ,这会引发一个错误,指出该关键字不存在(老实说,我没有花很多时间试图找出原因)。

用户Mack在他的回答中提供了一种解决方法来打印出中间训练结果。使用他提供的方法可以打印评估指标的中间结果。由于我想提取精度、召回率、f1 和混淆矩阵的中间结果,我对他实现的方法做了一些更改:

TestResult = collections.namedtuple("TestResult", ["params", "metrics"])

class CrossValidatorVerbose(CrossValidator):

    def _fit(self, dataset):
        folds = []
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)

        eva = self.getOrDefault(self.evaluator)
        metricName = eva.getMetricName()
        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds

        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        for i in range(nFolds):
            folds.append([])
            foldNum = i + 1
            print("Comparing models on fold %d" % foldNum)

            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition)
            train = df.filter(~condition)

            for j in range(numModels):
                paramMap = epm[j]
                model = est.fit(train, paramMap)
                # TODO: duplicate evaluator to take extra params from input
                prediction = model.transform(validation, paramMap)
                metric = eva.evaluate(prediction)
                metrics[j] += metric

                avgSoFar = metrics[j] / foldNum
                print("params: %s\t%s: %f\tavg: %f" % (
                    {param.name: val for (param, val) in paramMap.items()},
                    metricName, metric, avgSoFar))
                
                predictionLabels = prediction.select("prediction", "label")
                allMetrics = MulticlassMetrics(predictionLabels.rdd)
                folds[i].append(TestResult(paramMap.items(), allMetrics))
                

        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

        bestParams = epm[bestIndex]
        bestModel = est.fit(dataset, bestParams)
        avgMetrics = [m / nFolds for m in metrics]
        bestAvg = avgMetrics[bestIndex]
        print("Best model:\nparams: %s\t%s: %f" % (
            {param.name: val for (param, val) in bestParams.items()},
            metricName, bestAvg))

        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics)), folds
Run Code Online (Sandbox Code Playgroud)

要使用它,只需将 CrossValidator 方法替换为 CrossValidatorVerbose 并在进行模型拟合时执行以下操作:

cvModel, folds  = crossval.fit(df)
Run Code Online (Sandbox Code Playgroud)

要打印特定折叠的指标(具有第一组超参数的第一折叠):

def printMetrics(metrics, df):
    labels = df.rdd.map(lambda lp: lp.label).distinct().collect()
    for label in sorted(labels):
        print("Class %s precision = %s" % (label, metrics.precision(label)))
        print("Class %s recall = %s" % (label, metrics.recall(label)))
        print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))
        print ""

    # Weighted stats
    print("Weighted recall = %s" % metrics.weightedRecall)
    print("Weighted precision = %s" % metrics.weightedPrecision)
    print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
    print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
    print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
    print("Accuracy = %s" % metrics.accuracy)

printMetrics(folds[0][0].metrics, df)
Run Code Online (Sandbox Code Playgroud)

将打印如下内容:

Class 0.0 precision = 0.809523809524
Class 0.0 recall = 0.772727272727
Class 0.0 F1 Measure = 0.790697674419

Class 1.0 precision = 0.857142857143
Class 1.0 recall = 0.818181818182
Class 1.0 F1 Measure = 0.837209302326

Class 2.0 precision = 0.875
Class 2.0 recall = 0.875
Class 2.0 F1 Measure = 0.875

...

Weighted recall = 0.808333333333
Weighted precision = 0.812411616162
Weighted F(1) Score = 0.808461689698
Weighted F(0.5) Score = 0.810428077222
Weighted false positive rate = 0.026335560185
Accuracy = 0.808333333333
Run Code Online (Sandbox Code Playgroud)