我试图了解如何在spark中定义作业和阶段,为此我现在使用我在这里找到的代码并激发UI.为了在spark UI上看到它,我不得不多次将文本复制并粘贴到文件上,因此需要更多时间来处理.
这是spark UI的输出:
现在,我明白有三个工作因为有三个动作,而且由于随机动作而产生阶段,但我不明白为什么在工作1阶段4,5和6与阶段相同作业0的0,1和2与作业2相同.
我怎么知道哪些阶段不仅仅是一个只看到java代码的工作(在执行任何事情之前)?而且,为什么跳过第4和第9阶段,我怎么知道它会在执行之前发生?
我知道有三个工作,因为有三个行动
我甚至会说可能有更多的Spark工作,但最小数量是3
.这一切都取决于转换的实现和使用的动作.
我不明白为什么在Job 1阶段4,5和6与Job 0的0,1和2阶段相同,而Job 2也是如此.
作业1是在RDD上运行的某些操作的结果finalRdd
.这RDD使用创建(倒退)join
,textFile
,map
,和distinct
.
val people = sc.textFile("people.csv").map { line =>
val tokens = line.split(",")
val key = tokens(2)
(key, (tokens(0), tokens(1))) }.distinct
val cities = sc.textFile("cities.csv").map { line =>
val tokens = line.split(",")
(tokens(0), tokens(1))
}
val finalRdd = people.join(cities)
Run Code Online (Sandbox Code Playgroud)
运行上面的内容,您将看到相同的DAG.
现在,当您执行leftOuterJoin
或执行rightOuterJoin
操作时,您将获得另外两个DAG.您正在使用以前使用的RDD来运行新的Spark作业,因此您将看到相同的阶段.
为什么第4和第9阶段被跳过
通常,Spark会跳过某些阶段的执行.灰色阶段是已经计算过的阶段,因此Spark将重复使用它们,从而使性能更好.
我怎么知道哪些阶段不仅仅是一个只看到java代码的工作(在执行任何事情之前)?
这就是RDD血统(图)所提供的.
scala> people.leftOuterJoin(cities).toDebugString
res15: String =
(3) MapPartitionsRDD[99] at leftOuterJoin at <console>:28 []
| MapPartitionsRDD[98] at leftOuterJoin at <console>:28 []
| CoGroupedRDD[97] at leftOuterJoin at <console>:28 []
+-(2) MapPartitionsRDD[81] at distinct at <console>:27 []
| | ShuffledRDD[80] at distinct at <console>:27 []
| +-(2) MapPartitionsRDD[79] at distinct at <console>:27 []
| | MapPartitionsRDD[78] at map at <console>:24 []
| | people.csv MapPartitionsRDD[77] at textFile at <console>:24 []
| | people.csv HadoopRDD[76] at textFile at <console>:24 []
+-(3) MapPartitionsRDD[84] at map at <console>:29 []
| cities.csv MapPartitionsRDD[83] at textFile at <console>:29 []
| cities.csv HadoopRDD[82] at textFile at <console>:29 []
Run Code Online (Sandbox Code Playgroud)
正如你自己所看到的那样,你将最终得到4个阶段,因为有3个shuffle依赖项(带有分区数的边).
圆括号中的数字是DAGScheduler
最终用于创建具有确切任务数的任务集的分区数.TaskSet
每个阶段一个.
归档时间: |
|
查看次数: |
1810 次 |
最近记录: |