使用PartitionBy按键拆分和有效计算RDD组

Seb*_*Seb 7 apache-spark rdd

我已经实现了一个RDD[K, V]按键分组的解决方案,并根据每个组计算数据(K, RDD[V]),使用partitionByPartitioner.不过,我不确定它是否真的有效,我想有你的观点.

下面是一个示例案例:根据列表[K: Int, V: Int],计算V每个组的s均值K,知道它应该是分布式的,并且V值可能非常大.这应该给:

List[K, V] => (K, mean(V))
Run Code Online (Sandbox Code Playgroud)

简单的Partitioner类:

class MyPartitioner(maxKey: Int) extends Partitioner {

    def numPartitions = maxKey

    def getPartition(key: Any): Int = key match {
      case i: Int if i < maxKey => i
    }
  }
Run Code Online (Sandbox Code Playgroud)

分区代码:

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))

      val rdd = sc.parallelize(l)
      val p =  rdd.partitionBy(new MyPartitioner(4)).cache()

      p.foreachPartition(x => {
        try {
          val r = sc.parallelize(x.toList)
          val id = r.first() //get the K partition id
          val v = r.map(x => x._2)
          println(id._1 + "->" + mean(v))
        } catch {
          case e: UnsupportedOperationException => 0
        }
      })
Run Code Online (Sandbox Code Playgroud)

输出是:

1->13, 2->4, 3->7

我的问题是:

  1. 打电话时真的发生了什么partitionBy?(对不起,我没有找到足够的规格)
  2. 通过分区映射是否真的有效,知道在我的生产情况下,非常多的值(样本为100万)不会太多的密钥(样本为50)
  3. 费用是paralellize(x.toList)多少?这样做是否一致?(我需要一个RDD输入mean())
  4. 你会怎么做自己做的?

问候

Dan*_*bos 4

你的代码不应该工作。您不能将SparkContext对象传递给执行者。(不是Serializable。)而且我不明白你为什么需要这样做。

要计算平均值,您需要计算总和和计数并计算它们的比率。默认分区器就可以了。

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = {
  case class SumCount(sum: Double, count: Double)
  val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0),
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count))
  sumCounts.map(sc => sc.sum / sc.count)
}
Run Code Online (Sandbox Code Playgroud)

这是一种高效的单遍计算,具有良好的泛化性。