小编TBh*_*mdi的帖子

通过Scala Spark读取单独的目录并并行创建单独的RDD

我需要从单独的源目录中读取JSON文件,并为每个目录创建单独的表.我希望这是并行完成的,但Spark不支持嵌套的RDD,所以目前它正在按顺序执行.是否有一个很好的解决方案可以并行读取/处理这些目录?

这是我正在尝试的示例片段,但由于嵌套的RDD,它不起作用:

def readJsonCreateTable(tableInfo: (String, String)) {
  val df = spark
           .read
           .json(tableInfo._1)
  df.createOrReplaceTempView(tableInfo._2)
}

val dirList = List(("/mnt/jsondir1", "temptable1"),
                   ("/mnt/jsondir2", "temptable2"),
                   ("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error
Run Code Online (Sandbox Code Playgroud)

将最后一行更改为dirRDD.collect.foreach有效,但随后工作不会分发并按顺序执行,因此非常慢.

还尝试了dirRDD.collect.par.foreach,但是它只在驱动程序上运行并行线程,并且不使用所有其他节点.

我查看了foreachAsync,但由于嵌套,我不确定异步在这种情况下是否必然是并行的.

这是通过Databricks使用Spark 2.0和Scala 2.11.

===========
增加:

我尝试了foreachAsync,它在Spark中返回一个FutureAction,但也出现了错误.

import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)
Run Code Online (Sandbox Code Playgroud)

而且显然SimpleFutureAction不可序列化

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction
Run Code Online (Sandbox Code Playgroud)

scala apache-spark concurrent.futures rdd databricks

2
推荐指数
1
解决办法
502
查看次数