如何以高效的方式将1个RDD分成6个部分?

Swa*_*jee 2 scala amazon-emr apache-spark

我已经构建了一个Spark RDD,其中这个RDD的每个元素都是一个表示XML Record的JAXB根元素.

我想拆分这个RDD,以便从这个集合产生6个RDD.本质上,这项工作只是将分层XML结构转换为6组平坦的CSV记录.我目前正在通过相同的RDD 6六次这样做.

 xmlRdd.cache()
 val rddofTypeA = xmlRdd.map {iterate over XML Object and create Type A}
 rddOfTypeA.saveAsTextFile("s3://...")

 val rddofTypeB = xmlRdd.map { iterate over XML Object and create Type B}
 rddOfTypeB.saveAsTextFile("s3://...")

 val rddofTypeC = xmlRdd.map { iterate over XML Object and create Type C}
 rddOfTypeC.saveAsTextFile("s3://...")

 val rddofTypeD = xmlRdd.map { iterate over XML Object and create Type D}
 rddOfTypeD.saveAsTextFile("s3://...")

 val rddofTypeE = xmlRdd.map { iterate over XML Object and create Type E}
 rddOfTypeE.saveAsTextFile("s3://...")

 val rddofTypeF = xmlRdd.map { iterate over XML Object and create Type F}
 rddOfTypeF.saveAsTextFile("s3://...")
Run Code Online (Sandbox Code Playgroud)

我的输入数据集是35百万条记录,分为186个文件,每个448MB,存储在Amazon S3中.我的输出目录也在S3上.我正在使用EMR Spark.

使用六节点m4.4xlarge群集,需要38分钟才能完成此分割并写入输出.

有没有一种有效的方法来实现这一目标,而不是六次走过RDD?

Jac*_*ski 5

最简单的解决方案(从Spark开发人员的角度来看)是在单独的线程上执行mapsaveAsTextFile每个RDD.

未被广泛知晓(并因此被利用)的SparkContext是线程安全的事实,因此可用于从单独的线程提交作业.

话虽如此,您可以执行以下操作(使用最简单的Scala解决方案,Future但不一定是最好的,因为Future在实例化时启动计算而不是在您这样说时):

xmlRdd.cache()

import scala.concurrent.ExecutionContext.Implicits.global
val f1 = Future {
  val rddofTypeA = xmlRdd.map { map xml to csv}
  rddOfTypeA.saveAsTextFile("s3://...")
}

val f2 = Future {
  val rddofTypeB = xmlRdd.map { map xml to csv}
  rddOfTypeB.saveAsTextFile("s3://...")
}

...

Future.sequence(Seq(f1,f2)).onComplete { ... }
Run Code Online (Sandbox Code Playgroud)

这可能会缩短进行映射和保存的时间,但不会减少数据集上的扫描次数.无论如何,这应该不是什么大问题,因为数据集是缓存的,因此在内存和/或磁盘中(默认持久性级别MEMORY_AND_DISK在Spark SQL中Dataset.cache).