尽管重新分区,但只能在每个分区中压缩具有相同数量元素的RDD

KyB*_*yBe 4 scala apache-spark rdd

我加载了一个数据集

val data = sc.textFile("/home/kybe/Documents/datasets/img.csv",defp)
Run Code Online (Sandbox Code Playgroud)

我想在这个数据上放一个索引

val nb = data.count.toInt
val tozip = sc.parallelize(1 to nb).repartition(data.getNumPartitions)

val res = tozip.zip(data)
Run Code Online (Sandbox Code Playgroud)

不幸的是我有以下错误

Can only zip RDDs with same number of elements in each partition
Run Code Online (Sandbox Code Playgroud)

如果可能的话,我怎么能按分区修改元素的数量?

Mat*_*zyk 9

为什么它不起作用?

zip()的文档说明:

将RDD与另一个RDD一起切换,返回键值对,每个RDD中的第一个元素,每个RDD中的第二个元素等.假设两个RDD在每个分区中具有相同数量的分区相同数量的元素(例如,一个是通过另一个地图制作的.

所以我们需要确保我们满足两个条件:

  • 两个RDD具有相同数量的分区
  • 这些RDD中的相应分区具有完全相同的大小

您确保具有相同数量的分区,repartition()但Spark不保证您在每个RDD的每个分区中具有相同的分布.

这是为什么?

因为有不同类型的RDD,并且大多数都有不同的分区策略!例如:

  • ParallelCollectionRDD是在您使用sc.parallelize(collection)它并行化集合时创建的,它将查看应该有多少个分区,将检查集合的step大小并计算大小.即列表中有15个元素并且需要4个分区,前3个将具有4个连续元素,最后一个将具有剩余3个.
  • HadoopRDD如果我没记错的话,每个文件块一个分区.即使您在内部使用本地文件,当您读取本地文件然后映射该RDD时,Spark首先创建这种RDD,因为该RDD是一对RDD <Long, Text>并且您只是想要String:-)
  • etc.etc.

在你的例子中,Spark内部会在进行重新分区时创建不同类型的RDD(CoalescedRDDShuffledRDD),但我认为你得到了不同RDD具有不同分区策略的全局概念:-)

请注意,zip()doc 的最后一部分提到了map()操作.此操作不会重新分区,因为它是一个狭窄的转换数据,因此它可以保证两种条件.

在这个简单的例子中,你可以简单地提到它data.zipWithIndex.如果你需要更复杂的东西,那么创建新的RDD zip()应该map()如上所述创建.