Spark:测试RDD是否为空的有效方法

Tob*_*ber 26 scala apache-spark rdd

isEmptyRDD上没有方法,那么如果RDD为空,最有效的测试方法是什么?

Tob*_*ber 28

RDD.isEmpty() 将成为Spark 1.3.0的一部分.

根据这个apache邮件线程中的建议以及后来对这个答案的一些评论,我做了一些小的本地实验.最好的方法是使用take(1).length==0.

def isEmpty[T](rdd : RDD[T]) = {
  rdd.take(1).length == 0 
}
Run Code Online (Sandbox Code Playgroud)

它应该在O(1)RDD为空时运行,在这种情况下,它在分区数量上是线性的.

感谢Josh Rosen和Nick Chammas指出这一点.

注意:如果RDD类型为这个失败RDD[Nothing]例如isEmpty(sc.parallelize(Seq())),但这很可能不是在现实生活中的问题.isEmpty(sc.parallelize(Seq[Any]()))工作良好.


编辑:

  • 编辑1:添加take(1)==0方法,感谢评论.

我原来的建议:使用mapPartitions.

def isEmpty[T](rdd : RDD[T]) = {
  rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_) 
}
Run Code Online (Sandbox Code Playgroud)

它应该按分区数量进行扩展,并且不像它那样干净take(1).然而,它对于RDD类型来说是健壮的RDD[Nothing].


实验:

我用这个代码来表示时间.

def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
  val start = System.currentTimeMillis()
  val rdd = sc.parallelize(1L to n, numSlices = 100)
  val result = f(rdd)
  printf("Time: " + (System.currentTimeMillis() - start) + "   Result: " + result)
}

time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)

time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)

time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)
Run Code Online (Sandbox Code Playgroud)

在具有3个工作核心的本地计算机上,我得到了这些结果

Time:    21   Result: false
Time:    75   Result: false
Time:  8664   Result: false
Time: 18266   Result: false
Time: 23836   Result: false

Time:   113   Result: false
Time:   101   Result: false
Time:    68   Result: false
Time:   221   Result: false
Time:    46   Result: false

Time:    79   Result: true
Time:    93   Result: true
Time:    79   Result: true
Time:   100   Result: true
Time:    64   Result: true
Run Code Online (Sandbox Code Playgroud)

  • Spark最近合并了一个pull请求,为RDD添加一个`isEmpty`方法:https://github.com/apache/spark/pull/4074 (3认同)