RDD中是否有任何操作可以保持订单?

Eas*_*sun 6 reduce scala fold apache-spark rdd

我希望在RDD性能方面采取行动,reduce但不需要操作员可交换.即我希望result跟随永远是"123456789".

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> val result = rdd.someAction{ _+_ }
Run Code Online (Sandbox Code Playgroud)

首先,我找到了fold.文件RDD#fold说:

def fold(zeroValue:T)(op:(T,T)⇒T):T使用给定的关联函数和中性"零值" 聚合每个分区的元素,然后聚合所有分区的结果

请注意,doc中不需要交换.但是,结果并不像预期的那样:

scala> rdd.fold(""){ _+_ }
res10: String = 312456879
Run Code Online (Sandbox Code Playgroud)

编辑我试过@ dk14提到的,没有运气:

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res22: String = 341276895

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res23: String = 914856273

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res24: String = 742539618

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res25: String = 271468359
Run Code Online (Sandbox Code Playgroud)

zer*_*323 2

Scala 中没有满足此标准的内置归约操作,但您可以通过组合mapPartitions,collect和局部归约轻松实现自己的归约操作:

import scala.reflect.ClassTag

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = {
  rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f)
}
Run Code Online (Sandbox Code Playgroud)

使用collect和的组合reduce进行合并,而不是使用 by 的异步和无序方法,fold可确保保留全局顺序。

这当然会带来一些额外费用,包括:

  • 驱动程序的内存占用略高。
  • 显着更高的延迟 - 在开始本地缩减之前,我们明确等待所有任务完成。