Gle*_*ker 4 sorting union scala apache-spark rdd
我正在尝试与已经分布在我们集群中的 RDD 结合起来,并在键上进行散列分区。我不需要保留任何排序甚至分区,我只希望联合尽可能快。在这个例子中,我实际上确实想要所有记录,而不仅仅是不同的记录,而是保持多样性。
这是我天真地使用的内容:
val newRDD = tempRDD1.union(tempRDD2)
Run Code Online (Sandbox Code Playgroud)
这是有人向我推荐的更快,因为它利用了 RDD 已经分区和分布的方式:
val newRDD = tempRDD1.zipPartitions(tempRDD2, preservesPartitioning=true)((iter, iter2) => iter++iter2)
Run Code Online (Sandbox Code Playgroud)
其中哪个更快?结果是否完全一致,在成员方面?
我问这个是因为到目前为止我认为这些方法是等效的,但是当我增加数据的规模和分区、执行程序、内存等的数量时,我得到了 zipPartitions 方法的奇怪结果,这不是之后使用 reduceByKey 正常工作。
也许我的差异是由于我的 RDD 本身,它们具有 ((String, String), (String, Long, Long, Long, Long)) 形式,所以也许 iter++iter2 正在做一些事情而不是联合这些值?
zipPartitions 是否隐式地做了一些额外的事情,比如比较排序,或者重新散列的东西,或者通常以不同于联合的方式实现合并?
如果 RDD 包含非不同行、多个键副本、空分区、键的哈希冲突或任何其他此类问题,union-vs-zipPartitions 会返回不同的结果吗?
是的,我可以自己运行测试(事实上,过去 2 天我已经这样做了!),所以请不要发布任何愚蠢的问题,问我是否尝试过这样那样的......我是提出这个问题是为了更好地了解代码级别的幕后情况。“union”是作为“zipPartitions”的子案例编写的吗?
稍后编辑:按照@Holden 的建议添加一些带有 toDebugString 结果的示例
val tempIntermediateRDD6 = tempIntermediateRDD1.
zipPartitions(tempIntermediateRDD2, true)((iter, iter2) => iter++iter2).
zipPartitions(tempIntermediateRDD5, true)((iter, iter2) => iter++iter2).
partitionBy(partitioner).
setName("tempIntermediateRDD6").
persist(StorageLevel.MEMORY_AND_DISK_SER)
tempIntermediateRDD6.checkpoint
println(tempIntermediateRDD6.toDebugString)
// (2568) tempIntermediateRDD6 ZippedPartitionsRDD2[169] at zipPartitions at mycode.scala:3203 [Disk Memory Serialized 1x Replicated]
// | ZippedPartitionsRDD2[168] at zipPartitions at mycode.scala:3202 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated]
// | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated]
Run Code Online (Sandbox Code Playgroud)
相对:
val tempIntermediateRDD6 = tempIntermediateRDD1.
union(tempIntermediateRDD2).
union(tempIntermediateRDD5).
partitionBy(partitioner).
setName("tempIntermediateRDD6").
persist(StorageLevel.MEMORY_AND_DISK_SER)
tempIntermediateRDD6.checkpoint
println(tempIntermediateRDD6.toDebugString)
// (2568) tempIntermediateRDD6 ShuffledRDD[170] at partitionBy at mycode.scala:3208 [Disk Memory Serialized 1x Replicated]
// +-(5136) UnionRDD[169] at union at mycode.scala:3207 [Disk Memory Serialized 1x Replicated]
// | PartitionerAwareUnionRDD[168] at union at mycode.scala:3206 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated]
// | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated]
Run Code Online (Sandbox Code Playgroud)
Union 返回一个专门的UnionRDD,我们可以UnionRDD.scala在 Spark 项目中查看它是如何编写的。查看它我们可以看到它Union实际上是使用以下代码块实现的:
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](rdds.map(_.partitions.length).sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
pos += 1
}
array
}
Run Code Online (Sandbox Code Playgroud)
如果您对 RDD 上的底层计算感到好奇,我建议您toDebugString在生成的 RDD 上使用该函数。然后,您可以看到依赖项 DAG 的样子。