KyB*_*yBe 4 scala apache-spark rdd
我加载了一个数据集
val data = sc.textFile("/home/kybe/Documents/datasets/img.csv",defp)
Run Code Online (Sandbox Code Playgroud)
我想在这个数据上放一个索引
val nb = data.count.toInt
val tozip = sc.parallelize(1 to nb).repartition(data.getNumPartitions)
val res = tozip.zip(data)
Run Code Online (Sandbox Code Playgroud)
不幸的是我有以下错误
Can only zip RDDs with same number of elements in each partition
Run Code Online (Sandbox Code Playgroud)
如果可能的话,我怎么能按分区修改元素的数量?
zip()的文档说明:
将RDD与另一个RDD一起切换,返回键值对,每个RDD中的第一个元素,每个RDD中的第二个元素等.假设两个RDD在每个分区中具有相同数量的分区和相同数量的元素(例如,一个是通过另一个地图制作的.
所以我们需要确保我们满足两个条件:
您确保具有相同数量的分区,repartition()
但Spark不保证您在每个RDD的每个分区中具有相同的分布.
因为有不同类型的RDD,并且大多数都有不同的分区策略!例如:
sc.parallelize(collection)
它并行化集合时创建的,它将查看应该有多少个分区,将检查集合的step
大小并计算大小.即列表中有15个元素并且需要4个分区,前3个将具有4个连续元素,最后一个将具有剩余3个.<Long, Text>
并且您只是想要String
:-)在你的例子中,Spark内部会在进行重新分区时创建不同类型的RDD(CoalescedRDD
和ShuffledRDD
),但我认为你得到了不同RDD具有不同分区策略的全局概念:-)
请注意,zip()
doc 的最后一部分提到了map()
操作.此操作不会重新分区,因为它是一个狭窄的转换数据,因此它可以保证两种条件.
在这个简单的例子中,你可以简单地提到它data.zipWithIndex
.如果你需要更复杂的东西,那么创建新的RDD zip()
应该map()
如上所述创建.
归档时间: |
|
查看次数: |
2785 次 |
最近记录: |