如何使用Spark计算累积和

Kni*_*t71 14 scala apache-spark

我有一个(String,Int)的rdd,按键排序

val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey
Run Code Online (Sandbox Code Playgroud)

现在我想要启动第一个键的值为零,后续键为前一个键的总和.

例如:c1 = 0,c2 = c1的值,c3 =(c1值+ c2值),c4 =(c1 + .. + c3值)预期输出:

(c1,0), (c2,6), (c3,9)...
Run Code Online (Sandbox Code Playgroud)

是否有可能实现这一目标?我用地图尝试了它,但总和没有保留在地图内.

var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}
Run Code Online (Sandbox Code Playgroud)

zer*_*323 18

  1. 计算每个分区的部分结果:

    val partials = rdd.mapPartitionsWithIndex((i, iter) => {
      val (keys, values) = iter.toSeq.unzip
      val sums  = values.scanLeft(0)(_ + _)
      Iterator((keys.zip(sums.tail), sums.last))
    })
    
    Run Code Online (Sandbox Code Playgroud)
  2. 收集部分金额

    val partialSums = partials.values.collect
    
    Run Code Online (Sandbox Code Playgroud)
  3. 计算分区的累积总和并广播它:

    val sumMap = sc.broadcast(
      (0 until rdd.partitions.size)
        .zip(partialSums.scanLeft(0)(_ + _))
        .toMap
    )
    
    Run Code Online (Sandbox Code Playgroud)
  4. 计算最终结果:

    val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
      val offset = sumMap.value(i)
      if (iter.isEmpty) Iterator()
      else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
    })
    
    Run Code Online (Sandbox Code Playgroud)