如何在Spark中并行读写多个表?

wdz*_*wdz 6 parallel-processing scala apache-spark apache-spark-sql

在我的Spark应用程序中,我尝试从RDBMS读取多个表,进行一些数据处理,然后将多个表写入另一个RDBMS,如下所示(在Scala中):

val reading1 = sqlContext.load("jdbc", Map("url" -> myurl1, "dbtable" -> mytable1))
val reading2 = sqlContext.load("jdbc", Map("url" -> myurl1, "dbtable" -> mytable2))
val reading3 = sqlContext.load("jdbc", Map("url" -> myurl1, "dbtable" -> mytable3))

// data processing
// ..............

myDF1.write.mode("append").jdbc(myurl2, outtable1, new java.util.Properties)
myDF2.write.mode("append").jdbc(myurl2, outtable2, new java.util.Properties)
myDF3.write.mode("append").jdbc(myurl2, outtable3, new java.util.Properties)
Run Code Online (Sandbox Code Playgroud)

我了解可以使用分区并行读取一个表。但是,read1,read2,read3的读操作似乎是顺序的,myDF1,myDF2,myDF3的写操作也是如此。

如何并行读取多个表(mytable1,mytable2,mytable3)?并且还并行写入多个表(我认为逻辑相同)?

小智 3

您可以将模式安排为 FAIR,它应该并行运行任务。\n https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application

\n\n

应用程序内的调度\n在给定的 Spark 应用程序(SparkContext 实例)内,如果多个并行作业是从单独的线程提交的,则它们可以同时运行。在本节中,\xe2\x80\x9cjob\xe2\x80\x9d 是指 Spark 操作(例如保存、收集)以及需要运行以评估该操作的任何任务。Spark\xe2\x80\x99s 调度程序是完全线程安全的,并支持此用例以启用服务多个请求(例如多个用户的查询)的应用程序。

\n\n

默认情况下,Spark\xe2\x80\x99s 调度程序以 FIFO 方式运行作业。每个作业分为\xe2\x80\x9cstages\xe2\x80\x9d(例如map和reduce阶段),第一个作业在所有可用资源上获得优先级,而其阶段有任务要启动,然后第二个作业获得优先级,如果队列头部的作业不\xe2\x80\x99t需要使用整个集群,则后面的作业可以立即开始运行,但如果队列头部的作业很大,则后面的作业可能会显着延迟。

\n\n

从 Spark 0.8 开始,还可以在作业之间配置公平共享。在公平共享下,Spark 以 \xe2\x80\x9cround robin\xe2\x80\x9d 方式在作业之间分配任务,以便所有作业获得大致相等的集群资源份额。这意味着在长作业运行时提交的短作业可以立即开始接收资源,并且仍然可以获得良好的响应时间,而无需等待长作业完成。此模式最适合多用户设置。

\n\n
val conf = new SparkConf().setMaster(...).setAppName(...)\nconf.set("spark.scheduler.mode", "FAIR")\nval sc = new SparkContext(conf)\n
Run Code Online (Sandbox Code Playgroud)\n