yh1*_*190 27 hadoop scala apache-spark
我是Spark的新手.我有一个大的元素数据集[RDD],我想把它分成两个完全相同大小的分区,维护元素的顺序.我试着用RangePartitioner像
var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
Run Code Online (Sandbox Code Playgroud)
这不能给出令人满意的结果,因为它大致分割但不完全相同的大小维持元素的顺序.例如,如果有64个元素,我们使用
Rangepartitioner,然后它分为31个元素和33个元素.
我需要一个分区器,以便我在一半中获得前32个元素,而另一半包含第二组32个元素.你能否通过建议如何使用自定义分区器来帮助我,这样我可以获得相同大小的两半,保持元素的顺序?
Dan*_*bos 24
Partitioner通过为分区分配密钥来工作.您需要事先了解密钥分发,或者查看所有密钥,才能创建这样的分区程序.这就是Spark没有为您提供的原因.
通常,您不需要这样的分区程序.事实上,我无法想出一个需要大小分区的用例.如果元素的数量是奇数怎么办?
无论如何,让我们说你有一个由顺序Ints 键入的RDD ,你知道总共有多少.然后你可以写一个Partitioner这样的自定义:
class ExactPartitioner[V](
partitions: Int,
elements: Int)
extends Partitioner {
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[Int]
// `k` is assumed to go continuously from 0 to elements-1.
return k * partitions / elements
}
}
Run Code Online (Sandbox Code Playgroud)
sam*_*est 12
这个答案有一些来自丹尼尔的灵感,但提供了一个完整的实现(使用pimp我的库模式)和一个人们复制和粘贴需求的例子:)
import RDDConversions._
trait RDDWrapper[T] {
def rdd: RDD[T]
}
// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
// Here we use a single Long to try to ensure the sort is balanced,
// but for really large dataset, we may want to consider
// using a tuple of many Longs or even a GUID
def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
.grouped(numPartitions).map(t => (t._1._1, t._2))
}
case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
def grouped(size: Int): RDD[T] = {
// TODO Version where withIndex is cached
val withIndex = rdd.mapPartitions(_.zipWithIndex)
val startValues =
withIndex.mapPartitionsWithIndex((i, iter) =>
Iterator((i, iter.toIterable.last))).toArray().toList
.sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)
withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
case (value, index) => (startValues(i) + index.toLong, value)
})
.partitionBy(new Partitioner {
def numPartitions: Int = size
def getPartition(key: Any): Int =
(key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
})
.map(_._2)
}
}
Run Code Online (Sandbox Code Playgroud)
然后在另一个文件中
// TODO modify above to be implicit class, rather than have implicit conversions
object RDDConversions {
implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] =
new RichRDD[T](rdd)
implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd)
implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}
Run Code Online (Sandbox Code Playgroud)
然后为你的用例你想要(假设它已经排序)
import RDDConversions._
yourRdd.grouped(2)
Run Code Online (Sandbox Code Playgroud)
免责声明:未经测试,有点直接写入SO答案
| 归档时间: |
|
| 查看次数: |
42614 次 |
| 最近记录: |