Ken*_*ath 2 scala apache-spark
请考虑Spark中的以下代码,它应该返回整数序列的sqrt的总和:
// Create an RDD of a sequence of integers
val data = sc.parallelize(Range(0,100))
// Transform RDD to sequence of Doubles
val x = data.map(_.toDouble)
// Reduce the sequence as the sum of the sqrt of each integer
// (repeated 10 times with each result stored as a kv pair)
val xReduceMultipleTimes = Range(0,10).map(n=>(n, x.reduce((x,y)=>x+Math.sqrt(y))))
Run Code Online (Sandbox Code Playgroud)
reduce操作在相同的RDD上按顺序重复多次,并且每次都应返回相同的结果.但是,我从Spark获得的输出是不一致的,并且远不及正确的值.
xReduceMultipleTimes: scala.collection.immutable.IndexedSeq[(Int, Double)] =
Vector((0,105.44288170056565), (1,245.5267945723869), (2,190.04459651854287),
(3,233.32211443903282), (4,190.04459651854287), (5,105.44288170056566),
(6,273.5022316153404), (7,105.44288170056568), (8,105.44288170056566),
(9,205.51799497636216))
Run Code Online (Sandbox Code Playgroud)
661.463正如Mathematica所证实的那样,正确的结果应该是正确的.
替换Math.sqrt(y)为y产生无根整数的正确和一致的总和(即4950).
关于什么可能导致不一致的任何想法?
平方根不是关联的.请记住,reduce做两件事,首先在本地减少,第一个参数确实是累加器,第二个参数是新值.它接下来要做的是合并中间结果,这会导致累加器添加到节点值的平方根....不是你想要的.要做到这一点,你必须使用aggregate
rdd.aggregate(0)((accum, value) => accum + Math.sqrt(value), _ + _)
Run Code Online (Sandbox Code Playgroud)
我相信这抓住了你的意图.第一个参数是0的种子,然后是在每个节点上本地运行的函数.然后,添加仅仅是添加节点,其不需要是平方根.
| 归档时间: |
|
| 查看次数: |
431 次 |
| 最近记录: |