dol*_*eng 3 scala apache-spark rdd
我正在运行为Hadoop-2.4预先构建的Spark-1.4.0(在本地模式下)来计算DoubleRDD的平方和.我的Scala代码看起来像
sc.parallelize(Array(2., 3.)).fold(0.0)((p, v) => p+v*v)
Run Code Online (Sandbox Code Playgroud)
它给出了令人惊讶的结果97.0.
与Scala版本相比,这是非常反直觉的 fold
Array(2., 3.).fold(0.0)((p, v) => p+v*v)
Run Code Online (Sandbox Code Playgroud)
这给出了预期的答案13.0.
由于缺乏理解,我很可能在代码中犯了一些棘手的错误.我已经读过有关如何使用的函数RDD.fold()应该是通信的,否则结果可能取决于分区等.例如,如果我将分区数更改为1,
sc.parallelize(Array(2., 3.), 1).fold(0.0)((p, v) => p+v*v)
Run Code Online (Sandbox Code Playgroud)
代码会169.0在我的机器上给我!
有人可以解释这里究竟发生了什么吗?
嗯,官方文档实际上很好地解释了它:
使用给定的关联和交换函数以及中性"零值",聚合每个分区的元素,然后聚合所有分区的结果.函数op(t1,t2)允许修改t1并将其作为结果值返回以避免对象分配; 但是,它不应该修改t2.
这与在Scala等函数语言中为非分布式集合实现的折叠操作有所不同.该折叠操作可以单独地应用于分区,然后将这些结果折叠成最终结果,而不是以某种定义的顺序将折叠顺序地应用于每个元素.对于不可交换的函数,结果可能与应用于非分布式集合的折叠的结果不同.
为了说明发生了什么,让我们尝试逐步模拟正在发生的事情:
val rdd = sc.parallelize(Array(2., 3.))
val byPartition = rdd.mapPartitions(
iter => Array(iter.fold(0.0)((p, v) => (p + v * v))).toIterator).collect()
Run Code Online (Sandbox Code Playgroud)
它给我们类似这种Array[Double] = Array(0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 9.0)和
byPartition.reduce((p, v) => (p + v * v))
Run Code Online (Sandbox Code Playgroud)
返回97
需要注意的重要一点是,结果可能因运行而异,具体取决于组合分区的顺序.
| 归档时间: |
|
| 查看次数: |
5236 次 |
| 最近记录: |