为什么sortBy转换会触发Spark作业?

Pra*_*jan 9 partitioning partitioner apache-spark rdd

根据Spark文档,只有RDD操作可以触发Spark作业,并且在对其调用操作时会对延迟进行转换评估.

我看到sortBy转换函数立即应用,它在SparkUI中显示为作业触发器.为什么?

use*_*411 9

sortBy使用sortByKey它实现依赖于RangePartitioner(JVM)或分区函数(Python).当您急切地初始化调用sortBy/ sortByKeypartitioner(分区函数)并对输入RDD进行采样以计算分区边界时.你看到的工作对应于这个过程.

仅当您对新创建的RDD或其后代执行操作时,才会执行实际排序.


Jac*_*ski 5

根据 Spark 文档,只有操作触发 Spark 中的作业,当对其调用操作时会延迟评估转换。

总的来说,您是对的,但正如您刚刚经历的那样,很少有例外,并且sortBy是其中之一(带有zipWithIndex)。

事实上,它是在 Spark 的 JIRA 中报告的,并以 Won't Fix 解决方案关闭。请参阅SPARK-1021 sortByKey() 在不应启动集群作业时启动集群作业

您可以看到在DAGScheduler启用日志记录的情况下运行的作业(以及稍后在 Web UI 中):

scala> sc.parallelize(0 to 8).sortBy(identity)
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
DEBUG DAGScheduler: submitStage(ResultStage 1)
DEBUG DAGScheduler: missing: List()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25)
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4)
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25
Run Code Online (Sandbox Code Playgroud)