无法使用不等数量的分区压缩RDD

赵祥宇*_*赵祥宇 2 apache-spark rdd

现在我有3个这样的RDD:

RDD1集:

1 2
3 4
5 6
7 8
9 10
Run Code Online (Sandbox Code Playgroud)

RDD2:

11 12
13 14
Run Code Online (Sandbox Code Playgroud)

rdd3:

15 16
17 18
19 20
Run Code Online (Sandbox Code Playgroud)

我想这样做:

rdd1.zip(rdd2.union(rdd3))
Run Code Online (Sandbox Code Playgroud)

我希望结果是这样的:

1 2 11 12
3 4 13 14
5 6 15 16
7 8 17 18
9 10 19 20
Run Code Online (Sandbox Code Playgroud)

但我有这样的例外:

线程"main"中的异常java.lang.IllegalArgumentException:无法压缩具有不等分区数的RDD

有人告诉我,我可以毫无例外地做到这一点:

rdd1.zip(rdd2.union(rdd3).repartition(1))
Run Code Online (Sandbox Code Playgroud)

但似乎这只是一点点成本.所以我想知道是否有其他方法可以解决这个问题.

Dan*_*bos 7

我不确定你的"费用"是什么意思,但你怀疑这repartition(1)不是正确的解决方案.它会将RDD重新分区为单个分区.

  • 如果您的数据不适合单台计算机,则会失败.
  • 只有rdd1具有单个分区才有效.当您有更多数据时,这可能不再适用.
  • repartition执行随机播放,因此您的数据最终可能会以不同方式排序.

我认为正确的解决方案是放弃使用zip,因为您可能无法确保分区匹配.创建一个密钥并使用join:

val indexedRDD1 = rdd1.zipWithIndex.map { case (v, i) => i -> v }
val indexedRDD2 = rdd2.zipWithIndex.map { case (v, i) => i -> v }
val offset = rdd2.count
val indexedRDD3 = rdd3.zipWithIndex.map { case (v, i) => (i + offset) -> v }
val combined =
  indexedRDD1.leftOuterJoin(indexedRDD2).leftOuterJoin(indexedRDD3).map {
    case (i, ((v1, v2Opt), v3Opt)) => i -> (v1, v2Opt.getOrElse(v3Opt.get))
  }
Run Code Online (Sandbox Code Playgroud)

无论分区如何,这都可以.如果您愿意,可以对结果进行排序并删除最后的索引:

val unindexed = combined.sortByKey().values
Run Code Online (Sandbox Code Playgroud)