为什么在reduce中使用sqrt的结果不一致?

Ken*_*ath 2 scala apache-spark

请考虑Spark中的以下代码,它应该返回整数序列的sqrt的总和:

// Create an RDD of a sequence of integers
val data = sc.parallelize(Range(0,100))

// Transform RDD to sequence of Doubles
val x = data.map(_.toDouble)

// Reduce the sequence as the sum of the sqrt of each integer 
// (repeated 10 times with each result stored as a kv pair)
val xReduceMultipleTimes = Range(0,10).map(n=>(n, x.reduce((x,y)=>x+Math.sqrt(y))))
Run Code Online (Sandbox Code Playgroud)

reduce操作在相同的RDD上按顺序重复多次,并且每次都应返回相同的结果.但是,我从Spark获得的输出是不一致的,并且远不及正确的值.

xReduceMultipleTimes: scala.collection.immutable.IndexedSeq[(Int, Double)] =
Vector((0,105.44288170056565), (1,245.5267945723869), (2,190.04459651854287),
(3,233.32211443903282), (4,190.04459651854287), (5,105.44288170056566), 
(6,273.5022316153404), (7,105.44288170056568), (8,105.44288170056566), 
(9,205.51799497636216))
Run Code Online (Sandbox Code Playgroud)

661.463正如Mathematica所证实的那样,正确的结果应该是正确的.

替换Math.sqrt(y)y产生无根整数的正确和一致的总和(即4950).

关于什么可能导致不一致的任何想法?

Jus*_*ony 5

平方根不是关联的.请记住,reduce做两件事,首先在本地减少,第一个参数确实是累加器,第二个参数是新值.它接下来要做的是合并中间结果,这会导致累加器添加到节点值的平方根....不是你想要的.要做到这一点,你必须使用aggregate

rdd.aggregate(0)((accum, value) => accum + Math.sqrt(value), _ + _)
Run Code Online (Sandbox Code Playgroud)

我相信这抓住了你的意图.第一个参数是0的种子,然后是在每个节点上本地运行的函数.然后,添加仅仅是添加节点,其不需要是平方根.