每个密钥的聚合RDD值

Use*_*rrr 1 scala aggregate-functions apache-spark rdd

我在密钥,值结构中有RDD(someKey,(measure1,measure2)).我按键分组,现在我想聚合每个键的值.

val RDD1 : RDD[(String,(Int,Int))]
RDD1.groupByKey()
Run Code Online (Sandbox Code Playgroud)

我需要的结果是:

key: avg(measure1), avg(measure2), max(measure1), max(measure2), min(measure1), min(measure2), count(*)
Run Code Online (Sandbox Code Playgroud)

Nik*_*ita 5

首先,避免groupByKey!你应该使用aggregateByKeycombineByKey.我们会用aggregateByKey.此函数将转换每个键的值:RDD[(K, V)] => RDD[(K, U)].它需要零值的类型U和知识如何合并(V, U) => U(U, U) => U.我简化了你的例子并希望得到:key: avg(measure1), avg(measure2), min(measure1), min(measure2), count(*)

  val rdd1 = sc.parallelize(List(("a", (11, 1)), ("a",(12, 3)), ("b",(10, 1))))
  rdd1
    .aggregateByKey((0.0, 0.0, Int.MaxValue, Int.MaxValue, 0))(
      {
        case ((sum1, sum2, min1, min2, count1), (v1, v2)) =>
          (sum1 + v1, sum2 + v2, v1 min min1, v2 min min2, count1+1)
      }, 
      {
        case ((sum1, sum2, min1, min2, count),
          (otherSum1, otherSum2, otherMin1, otherMin2, otherCount)) =>
          (sum1 + otherSum1, sum2 + otherSum2, 
           min1 min otherMin1, min2 min otherMin2, count + otherCount)
      }
    )
    .map {
      case (k, (sum1, sum2, min1, min2, count1)) => (k, (sum1/count1, sum2/count1, min1, min2, count1))
    }
    .collect()
Run Code Online (Sandbox Code Playgroud)

   (a,(11.5,2.0,11,1,2)), (b,(10.0,1.0,10,1,1))
Run Code Online (Sandbox Code Playgroud)