为什么我的 Spark 应用程序只在 1 个执行程序中运行?

Ale*_* K. 5 amazon-emr apache-spark spark-dataframe

我对 Spark 还是比较陌生,但我已经能够创建 Spark 应用程序我需要能够使用 JDBC 驱动程序从我们的 SQL Server 重新处理数据(我们正在删除昂贵的 SP),该应用程序从 Sql Server 加载一些表通过 JDBC 进入数据帧,然后我做了一些连接、一个组和一个过滤器,最后通过 JDBC 将一些数据重新插入到不同的表中。所有这些在 Amazon Web Services 中的 Spark EMR 中执行得很好,在 m3.xlarge 中有 2 个内核,大约一分钟。

我的问题如下: 1. 现在我在集群上有 1 个主节点和 2 个内核,但是每次我启动一个新步骤时,从我从历史服务器中看到的情况来看,似乎只有 1 个执行程序被用作我可以看到列出了 2 个执行程序,驱动程序根本没有使用,一个 id 为 1 的执行程序处理大约 1410 个任务。我完全不确定如何进行。

这也是 AWS 特有的,但我不想发布 2 个问题,因为它们有某种关联,有什么办法可以同时运行 2 个步骤?这意味着能够同时运行这个进程的 2 个 spark-submits,因为我们每天多次运行这个进程(它处理客户端数据)。我知道我可以通过该步骤启动一个新集群,但我希望能够快速进行处理,而只是启动一个新集群需要很长时间。谢谢!!!

Dan*_*ula 5

对于你的第一个问题:

我不确定是否是这种情况,但类似的事情发生在我们身上,也许它会有所帮助。

如果您正在使用sqlContext.read.format("jdbc").load()(或类似方式)从 JDBC 源中读取数据,则默认情况下不会对结果数据帧进行分区。因此,如果您是这种情况,则在不首先对其进行分区的情况下在结果数据帧中应用转换将导致只有一个执行程序能够处理它。如果不是您的情况,以下解决方案可能无法解决您的问题。

因此,我们的解决方案是在数据中创建一个数值从 1 到 32(我们所需的分区数)的数值列,并通过设置 jdbc 读取器的分区选项将其用作分区列(请查看此链接):

val connectionOptions = Map[String, String] (... <connection options> ...)
val options = connectionOptions ++ Map[String, String] (
    "partitionColumn" -> "column name", 
    "lowerBound" -> "1", 
    "upperBound" -> "32", 
    "numPartitions" -> "32"
)

val df = sqlContext.read.format("jdbc").options(options).load()
Run Code Online (Sandbox Code Playgroud)

因此,通过这种方法,不仅可以并行处理读取任务(真正提高性能并避免 OOM 错误),而且可以对生成的数据帧进行分区并并行处理所有后续转换。

我希望这有帮助。