Swa*_*jee 2 scala amazon-emr apache-spark
我已经构建了一个Spark RDD,其中这个RDD的每个元素都是一个表示XML Record的JAXB根元素.
我想拆分这个RDD,以便从这个集合产生6个RDD.本质上,这项工作只是将分层XML结构转换为6组平坦的CSV记录.我目前正在通过相同的RDD 6六次这样做.
xmlRdd.cache()
val rddofTypeA = xmlRdd.map {iterate over XML Object and create Type A}
rddOfTypeA.saveAsTextFile("s3://...")
val rddofTypeB = xmlRdd.map { iterate over XML Object and create Type B}
rddOfTypeB.saveAsTextFile("s3://...")
val rddofTypeC = xmlRdd.map { iterate over XML Object and create Type C}
rddOfTypeC.saveAsTextFile("s3://...")
val rddofTypeD = xmlRdd.map { iterate over XML Object and create Type D}
rddOfTypeD.saveAsTextFile("s3://...")
val rddofTypeE = xmlRdd.map { iterate over XML Object and create Type E}
rddOfTypeE.saveAsTextFile("s3://...")
val rddofTypeF = xmlRdd.map { iterate over XML Object and create Type F}
rddOfTypeF.saveAsTextFile("s3://...")
Run Code Online (Sandbox Code Playgroud)
我的输入数据集是35百万条记录,分为186个文件,每个448MB,存储在Amazon S3中.我的输出目录也在S3上.我正在使用EMR Spark.
使用六节点m4.4xlarge群集,需要38分钟才能完成此分割并写入输出.
有没有一种有效的方法来实现这一目标,而不是六次走过RDD?
最简单的解决方案(从Spark开发人员的角度来看)是在单独的线程上执行map和saveAsTextFile每个RDD.
未被广泛知晓(并因此被利用)的SparkContext是线程安全的事实,因此可用于从单独的线程提交作业.
话虽如此,您可以执行以下操作(使用最简单的Scala解决方案,Future但不一定是最好的,因为Future在实例化时启动计算而不是在您这样说时):
xmlRdd.cache()
import scala.concurrent.ExecutionContext.Implicits.global
val f1 = Future {
val rddofTypeA = xmlRdd.map { map xml to csv}
rddOfTypeA.saveAsTextFile("s3://...")
}
val f2 = Future {
val rddofTypeB = xmlRdd.map { map xml to csv}
rddOfTypeB.saveAsTextFile("s3://...")
}
...
Future.sequence(Seq(f1,f2)).onComplete { ... }
Run Code Online (Sandbox Code Playgroud)
这可能会缩短进行映射和保存的时间,但不会减少数据集上的扫描次数.无论如何,这应该不是什么大问题,因为数据集是缓存的,因此在内存和/或磁盘中(默认持久性级别MEMORY_AND_DISK在Spark SQL中Dataset.cache).
| 归档时间: |
|
| 查看次数: |
113 次 |
| 最近记录: |