我有以下RDD有4个分区: -
val rdd=sc.parallelize(1 to 20,4)
Run Code Online (Sandbox Code Playgroud)
现在我尝试在这上面调用mapPartitions: -
scala> rdd.mapPartitions(x=> { println(x.size); x }).collect
5
5
5
5
res98: Array[Int] = Array()
Run Code Online (Sandbox Code Playgroud)
为什么它返回空数组?anonymoys函数只是返回它收到的相同迭代器,那么它是如何返回空数组的呢?有趣的是,如果我删除println语句,它确实返回非空数组: -
scala> rdd.mapPartitions(x=> { x }).collect
res101: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
Run Code Online (Sandbox Code Playgroud)
这个我不明白.为什么println(只是打印迭代器的大小)的存在会影响函数的最终结果?
Jus*_*ony 10
那是因为x
是a TraversableOnce
,这意味着你通过调用遍历它size
然后将其返回....空.
你可以通过多种方式解决它,但这里有一个:
rdd.mapPartitions(x=> {
val list = x.toList;
println(list.size);
list.toIterator
}).collect
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1989 次 |
最近记录: |