如何为同等大小的分区的Spark RDD定义自定义分区程序,其中每个分区具有相同数量的元素?

yh1*_*190 27 hadoop scala apache-spark

我是Spark的新手.我有一个大的元素数据集[RDD],我想把它分成两个完全相同大小的分区,维护元素的顺序.我试着用RangePartitioner

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
Run Code Online (Sandbox Code Playgroud)

这不能给出令人满意的结果,因为它大致分割但不完全相同的大小维持元素的顺序.例如,如果有64个元素,我们使用 Rangepartitioner,然后它分为31个元素和33个元素.

我需要一个分区器,以便我在一半中获得前32个元素,而另一半包含第二组32个元素.你能否通过建议如何使用自定义分区器来帮助我,这样我可以获得相同大小的两半,保持元素的顺序?

Dan*_*bos 24

Partitioner通过为分区分配密钥来工作.您需要事先了解密钥分发,或者查看所有密钥,才能创建这样的分区程序.这就是Spark没有为您提供的原因.

通常,您不需要这样的分区程序.事实上,我无法想出一个需要大小分区的用例.如果元素的数量是奇数怎么办?

无论如何,让我们说你有一个由顺序Ints 键入的RDD ,你知道总共有多少.然后你可以写一个Partitioner这样的自定义:

class ExactPartitioner[V](
    partitions: Int,
    elements: Int)
  extends Partitioner {

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[Int]
    // `k` is assumed to go continuously from 0 to elements-1.
    return k * partitions / elements
  }
}
Run Code Online (Sandbox Code Playgroud)

  • 一旦定义了这个新类,你用它来调用它?RDD中的分区程序是val,我无法更改它,如果我使用此自定义分区程序定义新的RDD,如何使用方法创建它? (7认同)
  • 请注意,还有另一种方法可以影响您的分区方式.默认情况下它使用[`HashPartitioner`](http://stackoverflow.com/q/31424396/744133),所以通过覆盖你的[`hashCode`](https://docs.oracle.com/javase/8) /docs/api/java/lang/Object.html#hashCode--)方法,你也直接影响分区. (6认同)

sam*_*est 12

这个答案有一些来自丹尼尔的灵感,但提供了一个完整的实现(使用pimp我的库模式)和一个人们复制和粘贴需求的例子:)

import RDDConversions._

trait RDDWrapper[T] {
  def rdd: RDD[T]
}

// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
  rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
  // Here we use a single Long to try to ensure the sort is balanced, 
  // but for really large dataset, we may want to consider
  // using a tuple of many Longs or even a GUID
  def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
    rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
    .grouped(numPartitions).map(t => (t._1._1, t._2))
}

case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
  def grouped(size: Int): RDD[T] = {
    // TODO Version where withIndex is cached
    val withIndex = rdd.mapPartitions(_.zipWithIndex)

    val startValues =
      withIndex.mapPartitionsWithIndex((i, iter) => 
        Iterator((i, iter.toIterable.last))).toArray().toList
      .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)

    withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
      case (value, index) => (startValues(i) + index.toLong, value)
    })
    .partitionBy(new Partitioner {
      def numPartitions: Int = size
      def getPartition(key: Any): Int = 
        (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
    })
    .map(_._2)
  }
}
Run Code Online (Sandbox Code Playgroud)

然后在另一个文件中

// TODO modify above to be implicit class, rather than have implicit conversions
object RDDConversions {
  implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] = 
    new RichRDD[T](rdd)
  implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
    rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd)
  implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}
Run Code Online (Sandbox Code Playgroud)

然后为你的用例你想要(假设它已经排序)

import RDDConversions._

yourRdd.grouped(2)
Run Code Online (Sandbox Code Playgroud)

免责声明:未经测试,有点直接写入SO答案