我需要从单独的源目录中读取JSON文件,并为每个目录创建单独的表.我希望这是并行完成的,但Spark不支持嵌套的RDD,所以目前它正在按顺序执行.是否有一个很好的解决方案可以并行读取/处理这些目录?
这是我正在尝试的示例片段,但由于嵌套的RDD,它不起作用:
def readJsonCreateTable(tableInfo: (String, String)) {
val df = spark
.read
.json(tableInfo._1)
df.createOrReplaceTempView(tableInfo._2)
}
val dirList = List(("/mnt/jsondir1", "temptable1"),
("/mnt/jsondir2", "temptable2"),
("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error
Run Code Online (Sandbox Code Playgroud)
将最后一行更改为dirRDD.collect.foreach有效,但随后工作不会分发并按顺序执行,因此非常慢.
还尝试了dirRDD.collect.par.foreach,但是它只在驱动程序上运行并行线程,并且不使用所有其他节点.
我查看了foreachAsync,但由于嵌套,我不确定异步在这种情况下是否必然是并行的.
这是通过Databricks使用Spark 2.0和Scala 2.11.
===========
增加:
我尝试了foreachAsync,它在Spark中返回一个FutureAction,但也出现了错误.
import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)
Run Code Online (Sandbox Code Playgroud)
而且显然SimpleFutureAction不可序列化
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction
Run Code Online (Sandbox Code Playgroud)