Apache Spark-两样本Kolmogorov-Smirnov测试

Mar*_*ara 4 scala apache-spark pyspark

我在Spark中有两组数据(我们称它们为d1,d2)。我想执行两次样本Kolmogorov-Smirnov测试,以测试其基础人口分布函数是否不同。MLLib的Statistics.kolmogorovSmirnovTest可以这样做吗?

文档提供了以下示例:

import org.apache.spark.mllib.stat.Statistics

val data: RDD[Double] = ... // an RDD of sample data

// perform a KS test using a cumulative distribution function of our making
val myCDF: Double => Double = ...
val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
Run Code Online (Sandbox Code Playgroud)

我尝试计算d2的经验累积分布函数(将其收集为Map)并将其与d1进行比较。

Statistics.kolmogorovSmirnovTest(d1, ecdf_map)
Run Code Online (Sandbox Code Playgroud)

测试运行,但结果错误。

难道我做错了什么?是否有可能做到这一点?有任何想法吗?

感谢您的帮助!

Den*_*soi 6

在Spark Mllib中,KolmogorovSmirnovTest是单采样的和双面的。因此,如果您想要特定的两次采样变体,则在此库中是不可能的。但是,您仍然可以通过计算经验累积分布函数来比较数据集(我找到了一个库来这样做,所以如果结果很好,我将更新此答案)或使用与正态分布的偏差。在此示例中,我将继续介绍。

通过KST统计数据与正态分布比较数据集

为了进行此测试,我生成了3个分布:2个看起来相似的三角形分布和一个指数分布,以显示统计数据的巨大差异。

注意: 我找不到任何科学论文将这种方法描述为可行的分布比较方法,因此该想法主要是经验性的。

对于每种分布,您都可以最明确地找到其CDF与正态分布之间具有相同全局最大距离的镜像分布。

下一步是在给定的均值和标准偏差的情况下针对正态分布获取KS结果。我对它们进行了可视化处理以获得更好的图像:

轻松可视化分布及其KS测试结果

如您所见,三角分布的结果(KS统计量和p值)彼此接近,而指数分布却遥遥无期。如我在注释中所述,您可以通过镜像数据集来轻松地愚弄此方法,但对于真实世界的数据来说可以。

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.stat.Statistics

import org.apache.commons.math3.distribution.{ ExponentialDistribution, TriangularDistribution }

import breeze.plot._
import breeze.linalg._
import breeze.numerics._

object Main {

    def main( args: Array[ String ] ): Unit = {

        val conf = 
            new SparkConf()
            .setAppName( "SO Spark" )
            .setMaster( "local[*]" )
            .set( "spark.driver.host", "localhost" )

        val sc = new SparkContext( conf )

        // Create similar distributions
        val triDist1 = new TriangularDistribution( -3, 5, 7 )
        val triDist2 = new TriangularDistribution( -3, 7, 7 )

        // Exponential distribution to show big difference
        val expDist1 = new ExponentialDistribution( 0.6 )

        // Sample data from the distributions and parallelize it
        val n = 100000
        val sampledTriDist1 = sc.parallelize( triDist1.sample( n ) )
        val sampledTriDist2 = sc.parallelize( triDist2.sample( n ) )
        val sampledExpDist1 = sc.parallelize( expDist1.sample( n ) )

        // KS tests
        val resultTriDist1 = Statistics
            .kolmogorovSmirnovTest( sampledTriDist1, 
                                    "norm", 
                                    sampledTriDist1.mean, 
                                    sampledTriDist1.stdev )

        val resultTriDist2 = Statistics
            .kolmogorovSmirnovTest( sampledTriDist2, 
                                    "norm", 
                                    sampledTriDist2.mean, 
                                    sampledTriDist2.stdev )

        val resultExpDist1 = Statistics
            .kolmogorovSmirnovTest( sampledExpDist1, 
                                    "norm", 
                                    sampledExpDist1.mean, 
                                    sampledExpDist1.stdev )

        // Results
        val statsTriDist1 = 
            "Tri1: ( " + 
            resultTriDist1.statistic + 
            ", " + 
            resultTriDist1.pValue + 
            " )"

        val statsTriDist2 = 
            "Tri2: ( " + 
            resultTriDist2.statistic + 
            ", " + 
            resultTriDist2.pValue + 
            " )"

        val statsExpDist1 = 
            "Exp1: ( " + 
            resultExpDist1.statistic + 
            ", " + 
            resultExpDist1.pValue + 
            " )"  

        println( statsTriDist1 )
        println( statsTriDist2 )
        println( statsExpDist1 )

        // Visualize
        val graphCanvas = Figure()

        val mainPlot = 
            graphCanvas
            .subplot( 0 )

        mainPlot.legend = true

        val x = linspace( 1, n, n )      

        mainPlot += plot( x, 
                          sampledTriDist1.sortBy( x => x ).take( n ), 
                          name = statsTriDist1 )

        mainPlot += plot( x, 
                          sampledTriDist2.sortBy( x => x ).take( n ), 
                          name = statsTriDist2 )

        mainPlot += plot( x, 
                          sampledExpDist1.sortBy( x => x ).take( n ), 
                          name = statsExpDist1 )

        mainPlot.xlabel = "x"
        mainPlot.ylabel = "sorted sample"

        mainPlot.title = "KS results for 2 Triangular and 1 Exponential Distributions"

        graphCanvas.saveas( "ks-sample.png", 300 )

        sc.stop()
    }
}
Run Code Online (Sandbox Code Playgroud)