edu*_*ant 5 scala bigdata apache-spark
我想根据Spark官方文档了解Spark DAG模型。Spark 中的所有转换都是惰性的,默认情况下,每次对每个转换后的 RDD 运行操作时都可能会重新计算。所以我写了一个如下的小程序:
scala> val lines = sc.textFile("C:\\Spark\\README.md")
lines: org.apache.spark.rdd.RDD[String] = C:\Spark\README.md MapPartitionsRDD[1] at textFile at <console>:24
scala> val breakLInes = lines.flatMap(line=>line.split(" "))
breakLInes: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26
scala> val createTuple = breakLInes.map(line=>(line,1))
createTuple: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28
scala> val wordCount = createTuple.reduceByKey
reduceByKey reduceByKeyLocally
scala> val wordCount = createTuple.reduceByKey(_+_)
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:30
scala> wordCount.first
res0: (String, Int) = (package,1)
Run Code Online (Sandbox Code Playgroud)
现在转到下面的 Spark UI,这是我的第一个操作的 DAG 可视化:
再次执行:
scala> wordCount.first
res0: (String, Int) = (package,1)
Run Code Online (Sandbox Code Playgroud)
现在转到下面的 Spark UI,这是我的第二个操作的 DAG 可视化:
默认情况下,每次对每个转换后的 RDD 运行操作时,都可能会重新计算它。那么为什么它要跳过第 2 阶段,因为没有进行缓存,所以它应该再次计算第 2 阶段?
| 归档时间: |
|
| 查看次数: |
279 次 |
| 最近记录: |