我需要将RDD分成两部分:
满足条件的1部分; 另一部分没有.我可以filter在原始RDD上做两次但看起来效率低下.有没有办法可以做我想要的事情?我在API和文献中都找不到任何东西.
Mar*_*ier 21
Spark默认不支持此功能.如果您事先对其进行缓存,则对相同数据进行两次过滤并不是那么糟糕,并且过滤本身很快.
如果它实际上只是两种不同的类型,您可以使用辅助方法:
implicit class RDDOps[T](rdd: RDD[T]) {
def partitionBy(f: T => Boolean): (RDD[T], RDD[T]) = {
val passes = rdd.filter(f)
val fails = rdd.filter(e => !f(e)) // Spark doesn't have filterNot
(passes, fails)
}
}
val (matches, matchesNot) = sc.parallelize(1 to 100).cache().partitionBy(_ % 2 == 0)
Run Code Online (Sandbox Code Playgroud)
但只要您有多种类型的数据,只需将过滤分配给新的val即可.
Spark RDD没有此类API。
这是一个基于对rdd.span的拉取请求的版本,应该可以运行:
import scala.reflect.ClassTag
import org.apache.spark.rdd._
def split[T:ClassTag](rdd: RDD[T], p: T => Boolean): (RDD[T], RDD[T]) = {
val splits = rdd.mapPartitions { iter =>
val (left, right) = iter.partition(p)
val iterSeq = Seq(left, right)
iterSeq.iterator
}
val left = splits.mapPartitions { iter => iter.next().toIterator}
val right = splits.mapPartitions { iter =>
iter.next()
iter.next().toIterator
}
(left, right)
}
val rdd = sc.parallelize(0 to 10, 2)
val (first, second) = split[Int](rdd, _ % 2 == 0 )
first.collect
// Array[Int] = Array(0, 2, 4, 6, 8, 10)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8097 次 |
| 最近记录: |