为什么我的火花工作有这么多的任务?默认情况下获取200个任务

uh_*_*boi 20 hadoop scala task apache-spark apache-spark-sql

我有一个spark作业,它接收来自hdfs的8条记录的文件,做一个简单的聚合并将其保存回hdfs.当我这样做时,我注意到有数百个任务.

我也不确定为什么有这么多工作?我觉得工作更像是一个动作发生的时候.我可以推测为什么 - 但我的理解是,在这段代码中它应该是一个工作,它应该分解为阶段,而不是多个工作.为什么不把它分解成各个阶段,为什么它会闯入工作岗位?

至于200多个任务,由于数据量和节点数量微乎其微,当只有一个聚合和几个过滤器时,每行数据有25个任务是没有意义的.为什么每个原子操作每个分区只有一个任务?

这是相关的scala代码 -

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object TestProj {object TestProj {
  def main(args: Array[String]) {

    /* set the application name in the SparkConf object */
    val appConf = new SparkConf().setAppName("Test Proj")

    /* env settings that I don't need to set in REPL*/
    val sc = new SparkContext(appConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")

     /*the below rdd will have schema defined in Record class*/
     val rddCase =  sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
      .map(x=>x.split(" "))    //file record into array of strings based spaces
      .map(x=>Record(
        x(0).toInt,
        x(1).asInstanceOf[String],
        x(2).asInstanceOf[String],
        x(3).toInt))


    /* the below dataframe groups on first letter of first name and counts it*/
    val aggDF = rddCase.toDF()
      .groupBy($"firstName".substr(1,1).alias("firstLetter"))
      .count
      .orderBy($"firstLetter")

    /* save to hdfs*/ 
 aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")

  }

    case class Record(id: Int
      , firstName: String
      , lastName: String
      , quantity:Int)

}
Run Code Online (Sandbox Code Playgroud)

下面是单击应用程序后的屏幕截图 在此输入图像描述

以下是查看id 0的特定"作业"时显示的阶段 在此输入图像描述

下面是单击具有200多个任务的舞台时屏幕的第一部分

在此输入图像描述

这是舞台内屏幕的第二部分在此输入图像描述

下面是点击"执行者"选项卡后 在此输入图像描述

根据要求,以下是工作ID 1的各个阶段

在此输入图像描述

以下是具有200个任务的作业ID 1中的阶段的详细信息

在此输入图像描述

mar*_*ios 29

这是一个经典的Spark问题.

用于读取的两个任务(第二个图中的阶段Id 0)defaultMinPartitions是设置为2的设置.您可以通过读取REPL中的值来获取此参数sc.defaultMinPartitions.它也应该在"环境"点击下的Spark UI中可见.

你可以看看github 的代码,看看这到底发生了什么.如果您希望在读取时使用更多分区,只需将其添加为参数,例如,sc.textFile("a.txt", 20).

现在有趣的部分来自第二阶段出现的200个分区(第二个阶段的阶段Id 1).好吧,每次有一个shuffle,Spark需要决定shuffle RDD有多少个分区.可以想象,默认值为200.

您可以使用以下方法更改:

sqlContext.setConf("spark.sql.shuffle.partitions", "4”)
Run Code Online (Sandbox Code Playgroud)

如果使用此配置运行代码,您将看到200个分区不再存在.如何设置此参数是一种艺术.也许选择2倍的核心数量(或其他).

我认为Spark 2.0有一种方法可以自动推断shuffle RDD的最佳分区数.期待那样!

最后,您获得的作业数量与生成的优化Dataframe代码产生的RDD操作数量有关.如果您阅读Spark规范,它会说每个RDD操作都会触发一个作业.当您的操作涉及Dataframe或SparkSQL时,Catalyst优化器将找出执行计划并生成一些基于RDD的代码来执行它.在你的情况下,很难确切地说它为什么会使用两个动作.您可能需要查看优化的查询计划,以确切了解正在执行的操作.