检查RDD是否相等

Aki*_*i K 5 java junit equals apache-spark

我在JUnit中进行了一些测试,我需要检查两个Spark RDD的相等性.

我想做的一种方式是这样的:

JavaRDD<SomeClass> expResult = ...;
JavaRDD<SomeClass> result = ...;

assertEquals(expResult.collect(), result.collect());
Run Code Online (Sandbox Code Playgroud)

有比这更好的方法吗?

Wil*_*ire 5

如果预期的结果相当小,那么最好是collectRDD数据并在本地进行比较(就像你写的那样).

当需要在测试中使用足够大的数据集时,几乎没有其他可能性:

免责声明:我对Spark Java API不够熟悉,因此我将在Scala中编写更多示例代码.我希望它不会成为问题,因为它可能要么用Java重写,要么转换成从Java代码调用的几个实用程序函数.

方法1:将RDD压缩在一起并逐项比较

只有在RDD中元素的顺序被很好地定义(即,RDD被排序)时,此方法才可用.

val diff = expResult
  .zip(result)
  .collect { case (a, b) if a != b => a -> b }
  .take(100)
Run Code Online (Sandbox Code Playgroud)

diff阵列将包含多达100个差异对.如果RDD足够大,并且您想从diff本地获取所有项目,则可以使用toLocalIterator方法.最好不要使用collect方法,因为你可以运行OOM.

此方法可能是最快的,因为它不需要随机播放,但只有在RDD中的分区顺序和分区中项目的顺序得到很好定义时才可以使用它.

方法2:共同组RDD

此方法可用于测试resultRDD是否包含没有任何特定顺序的指定(可能是非唯一)值

  val diff = expResult.map(_ -> 1)
    .cogroup(result.map(_ -> 1))
    .collect { case (a, (i1, i2)) if i1.sum != i2.sum => a -> (i1.sum - i2.sum) }
    .take(100)
Run Code Online (Sandbox Code Playgroud)

diff数组将包含区分值以及金额之间的差异.

例如:

  • 如果expResult包含某个值的单个实例result且不包含该值,则该数字将为+1;
  • 如果result包含3个另一个值的实例,并且expResult只有1,则该数字将为-2.

此方法将比其他选项更快(即,相互减少RDD),因为它只需要一次shuffle.