Mar*_*ara 4 scala apache-spark pyspark
我在Spark中有两组数据(我们称它们为d1,d2)。我想执行两次样本Kolmogorov-Smirnov测试,以测试其基础人口分布函数是否不同。MLLib的Statistics.kolmogorovSmirnovTest可以这样做吗?
文档提供了以下示例:
import org.apache.spark.mllib.stat.Statistics
val data: RDD[Double] = ... // an RDD of sample data
// perform a KS test using a cumulative distribution function of our making
val myCDF: Double => Double = ...
val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
Run Code Online (Sandbox Code Playgroud)
我尝试计算d2的经验累积分布函数(将其收集为Map)并将其与d1进行比较。
Statistics.kolmogorovSmirnovTest(d1, ecdf_map)
Run Code Online (Sandbox Code Playgroud)
测试运行,但结果错误。
难道我做错了什么?是否有可能做到这一点?有任何想法吗?
感谢您的帮助!
在Spark Mllib中,KolmogorovSmirnovTest是单采样的和双面的。因此,如果您想要特定的两次采样变体,则在此库中是不可能的。但是,您仍然可以通过计算经验累积分布函数来比较数据集(我找到了一个库来这样做,所以如果结果很好,我将更新此答案)或使用与正态分布的偏差。在此示例中,我将继续介绍。
为了进行此测试,我生成了3个分布:2个看起来相似的三角形分布和一个指数分布,以显示统计数据的巨大差异。
注意: 我找不到任何科学论文将这种方法描述为可行的分布比较方法,因此该想法主要是经验性的。
对于每种分布,您都可以最明确地找到其CDF与正态分布之间具有相同全局最大距离的镜像分布。
下一步是在给定的均值和标准偏差的情况下针对正态分布获取KS结果。我对它们进行了可视化处理以获得更好的图像:
如您所见,三角分布的结果(KS统计量和p值)彼此接近,而指数分布却遥遥无期。如我在注释中所述,您可以通过镜像数据集来轻松地愚弄此方法,但对于真实世界的数据来说可以。
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.stat.Statistics
import org.apache.commons.math3.distribution.{ ExponentialDistribution, TriangularDistribution }
import breeze.plot._
import breeze.linalg._
import breeze.numerics._
object Main {
def main( args: Array[ String ] ): Unit = {
val conf =
new SparkConf()
.setAppName( "SO Spark" )
.setMaster( "local[*]" )
.set( "spark.driver.host", "localhost" )
val sc = new SparkContext( conf )
// Create similar distributions
val triDist1 = new TriangularDistribution( -3, 5, 7 )
val triDist2 = new TriangularDistribution( -3, 7, 7 )
// Exponential distribution to show big difference
val expDist1 = new ExponentialDistribution( 0.6 )
// Sample data from the distributions and parallelize it
val n = 100000
val sampledTriDist1 = sc.parallelize( triDist1.sample( n ) )
val sampledTriDist2 = sc.parallelize( triDist2.sample( n ) )
val sampledExpDist1 = sc.parallelize( expDist1.sample( n ) )
// KS tests
val resultTriDist1 = Statistics
.kolmogorovSmirnovTest( sampledTriDist1,
"norm",
sampledTriDist1.mean,
sampledTriDist1.stdev )
val resultTriDist2 = Statistics
.kolmogorovSmirnovTest( sampledTriDist2,
"norm",
sampledTriDist2.mean,
sampledTriDist2.stdev )
val resultExpDist1 = Statistics
.kolmogorovSmirnovTest( sampledExpDist1,
"norm",
sampledExpDist1.mean,
sampledExpDist1.stdev )
// Results
val statsTriDist1 =
"Tri1: ( " +
resultTriDist1.statistic +
", " +
resultTriDist1.pValue +
" )"
val statsTriDist2 =
"Tri2: ( " +
resultTriDist2.statistic +
", " +
resultTriDist2.pValue +
" )"
val statsExpDist1 =
"Exp1: ( " +
resultExpDist1.statistic +
", " +
resultExpDist1.pValue +
" )"
println( statsTriDist1 )
println( statsTriDist2 )
println( statsExpDist1 )
// Visualize
val graphCanvas = Figure()
val mainPlot =
graphCanvas
.subplot( 0 )
mainPlot.legend = true
val x = linspace( 1, n, n )
mainPlot += plot( x,
sampledTriDist1.sortBy( x => x ).take( n ),
name = statsTriDist1 )
mainPlot += plot( x,
sampledTriDist2.sortBy( x => x ).take( n ),
name = statsTriDist2 )
mainPlot += plot( x,
sampledExpDist1.sortBy( x => x ).take( n ),
name = statsExpDist1 )
mainPlot.xlabel = "x"
mainPlot.ylabel = "sorted sample"
mainPlot.title = "KS results for 2 Triangular and 1 Exponential Distributions"
graphCanvas.saveas( "ks-sample.png", 300 )
sc.stop()
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1230 次 |
| 最近记录: |