在Apache Spark中,为什么RDD.union不保留分区器?

tri*_*oid 22 partitioning hadoop-partitioning apache-spark

众所周知,Spark中的分区器对任何"广泛"操作都会产生巨大的性能影响,因此通常会在操作中进行自定义.我正在尝试以下代码:

val rdd1 =
  sc.parallelize(1 to 50).keyBy(_ % 10)
    .partitionBy(new HashPartitioner(10))
val rdd2 =
  sc.parallelize(200 to 230).keyBy(_ % 13)

val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)

val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)
Run Code Online (Sandbox Code Playgroud)

我看到默认情况下cogroup()总是会生成带有自定义分区程序的RDD,但union()不会,它将始终恢复为默认值.这是违反直觉的,因为我们通常假设PairRDD应该使用其第一个元素作为分区键.有没有办法"强制"Spark合并2个PairRDD以使用相同的分区键?

Dan*_*bos 40

union是一种非常有效的操作,因为它不会移动任何数据.如果rdd1有10个分区并且rdd2有20个分区,rdd1.union(rdd2)则将有30个分区:两个RDD的分区相互放置.这只是一个记账改变,没有洗牌.

但它必然会丢弃分区器.为给定数量的分区构造分区器.将所得的RDD具有多个分区是从两个不同的rdd1rdd2.

在获取联合之后,您可以运行repartition以随机抽取数据并按键组织它.


上面有一个例外.如果rdd1并且rdd2具有相同的分区(具有相同数量的分区),则union行为不同.它将成对加入两个RDD的分区,为每个输入提供相同数量的分区.这可能涉及移动数据(如果分区不在同一位置)但不会涉及随机播放.在这种情况下,保留分区程序.(此代码在PartitionerAwareUnionRDD.scala中.)

  • 实际上有一个分区器感知的联合RDD,我认为应该在可以保留分区的情况下自动使用它; 不知道为什么它不适用于此.请参阅https://github.com/apache/spark/blob/e0628f2fae7f99d096f9dd625876a60d11020d9b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala#L123和https://github.com/apache/spark /blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala (4认同)