uh_*_*boi 20 hadoop scala task apache-spark apache-spark-sql
我有一个spark作业,它接收来自hdfs的8条记录的文件,做一个简单的聚合并将其保存回hdfs.当我这样做时,我注意到有数百个任务.
我也不确定为什么有这么多工作?我觉得工作更像是一个动作发生的时候.我可以推测为什么 - 但我的理解是,在这段代码中它应该是一个工作,它应该分解为阶段,而不是多个工作.为什么不把它分解成各个阶段,为什么它会闯入工作岗位?
至于200多个任务,由于数据量和节点数量微乎其微,当只有一个聚合和几个过滤器时,每行数据有25个任务是没有意义的.为什么每个原子操作每个分区只有一个任务?
这是相关的scala代码 -
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object TestProj {object TestProj {
def main(args: Array[String]) {
/* set the application name in the SparkConf object */
val appConf = new SparkConf().setAppName("Test Proj")
/* env settings that I don't need to set in REPL*/
val sc = new SparkContext(appConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
/*the below rdd will have schema defined in Record class*/
val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
.map(x=>x.split(" ")) //file record into array of strings based spaces
.map(x=>Record(
x(0).toInt,
x(1).asInstanceOf[String],
x(2).asInstanceOf[String],
x(3).toInt))
/* the below dataframe groups on first letter of first name and counts it*/
val aggDF = rddCase.toDF()
.groupBy($"firstName".substr(1,1).alias("firstLetter"))
.count
.orderBy($"firstLetter")
/* save to hdfs*/
aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")
}
case class Record(id: Int
, firstName: String
, lastName: String
, quantity:Int)
}
Run Code Online (Sandbox Code Playgroud)
下面是单击具有200多个任务的舞台时屏幕的第一部分
根据要求,以下是工作ID 1的各个阶段
以下是具有200个任务的作业ID 1中的阶段的详细信息
mar*_*ios 29
这是一个经典的Spark问题.
用于读取的两个任务(第二个图中的阶段Id 0)defaultMinPartitions是设置为2的设置.您可以通过读取REPL中的值来获取此参数sc.defaultMinPartitions.它也应该在"环境"点击下的Spark UI中可见.
你可以看看github 的代码,看看这到底发生了什么.如果您希望在读取时使用更多分区,只需将其添加为参数,例如,sc.textFile("a.txt", 20).
现在有趣的部分来自第二阶段出现的200个分区(第二个阶段的阶段Id 1).好吧,每次有一个shuffle,Spark需要决定shuffle RDD有多少个分区.可以想象,默认值为200.
您可以使用以下方法更改:
sqlContext.setConf("spark.sql.shuffle.partitions", "4”)
Run Code Online (Sandbox Code Playgroud)
如果使用此配置运行代码,您将看到200个分区不再存在.如何设置此参数是一种艺术.也许选择2倍的核心数量(或其他).
我认为Spark 2.0有一种方法可以自动推断shuffle RDD的最佳分区数.期待那样!
最后,您获得的作业数量与生成的优化Dataframe代码产生的RDD操作数量有关.如果您阅读Spark规范,它会说每个RDD操作都会触发一个作业.当您的操作涉及Dataframe或SparkSQL时,Catalyst优化器将找出执行计划并生成一些基于RDD的代码来执行它.在你的情况下,很难确切地说它为什么会使用两个动作.您可能需要查看优化的查询计划,以确切了解正在执行的操作.
| 归档时间: |
|
| 查看次数: |
9877 次 |
| 最近记录: |