通过Scala Spark读取单独的目录并并行创建单独的RDD

TBh*_*mdi 2 scala apache-spark concurrent.futures rdd databricks

我需要从单独的源目录中读取JSON文件,并为每个目录创建单独的表.我希望这是并行完成的,但Spark不支持嵌套的RDD,所以目前它正在按顺序执行.是否有一个很好的解决方案可以并行读取/处理这些目录?

这是我正在尝试的示例片段,但由于嵌套的RDD,它不起作用:

def readJsonCreateTable(tableInfo: (String, String)) {
  val df = spark
           .read
           .json(tableInfo._1)
  df.createOrReplaceTempView(tableInfo._2)
}

val dirList = List(("/mnt/jsondir1", "temptable1"),
                   ("/mnt/jsondir2", "temptable2"),
                   ("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error
Run Code Online (Sandbox Code Playgroud)

将最后一行更改为dirRDD.collect.foreach有效,但随后工作不会分发并按顺序执行,因此非常慢.

还尝试了dirRDD.collect.par.foreach,但是它只在驱动程序上运行并行线程,并且不使用所有其他节点.

我查看了foreachAsync,但由于嵌套,我不确定异步在这种情况下是否必然是并行的.

这是通过Databricks使用Spark 2.0和Scala 2.11.

===========
增加:

我尝试了foreachAsync,它在Spark中返回一个FutureAction,但也出现了错误.

import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)
Run Code Online (Sandbox Code Playgroud)

而且显然SimpleFutureAction不可序列化

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction
Run Code Online (Sandbox Code Playgroud)

Jos*_*sen 5

您可以使用Scala 并行集合期货来并行化Spark驱动程序上运行的代码.Spark驱动程序是线程安全的,因此这将按预期工作.

以下是使用具有显式指定的线程池的并行集合的示例:

val dirList = List(
  ("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"),
  ("dbfs:/databricks-datasets/amazon/users/", "users")
).par

val pool = new scala.concurrent.forkjoin.ForkJoinPool(2)

try {
  dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool)
  dirList.foreach { case (filename, tableName) =>
    println(s"Starting to create table for $tableName")
    val df = spark.read.json(filename)
    println(s"Done creating table for $tableName")
    df.createOrReplaceTempView(tableName)
  }
} finally {
  pool.shutdown() // to prevent thread leaks.
  // You could also re-use thread pools across collections.
}
Run Code Online (Sandbox Code Playgroud)

当我在Databricks中运行它时,它产生了流日志输出,表明这两个表是并行加载的:

Starting to create table for departuredelays
Starting to create table for users
Done creating table for departuredelays
Done creating table for users
Run Code Online (Sandbox Code Playgroud)

这种并行性也反映在Spark UI的作业时间轴视图中.

当然,您也可以使用Java线程.简而言之,从多个线程调用Spark驱动程序API是安全的,因此选择您选择的JVM并发框架并发出对Spark驱动程序的并行调用来创建表.

  • 那个链接的SO问题正在讨论并行性_within_一个任务,这与在驱动程序上并行运行多个Spark作业不同. (2认同)