Spark UI DAG阶段已断开连接

Jus*_*ond 6 scala apache-spark

我在spark-shell中运行了以下工作:

val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist
d.join(d.reduceByKey(_ + _)).collect
Run Code Online (Sandbox Code Playgroud)

Spark UI显示了三个阶段.阶段4和5对应于计算d,阶段6对应于collect动作的计算.既然d坚持下去,我只期望两个阶段.然而,阶段5不存在与任何其他阶段的连接.

Spark UI DAG

因此尝试在不使用持久化的情况下运行相同的计算,并且DAG看起来完全相同,除非没有指示RDD已被持久化的绿点.

Spark UI DAG没有持久化

我希望第11阶段的输出连接到第12阶段的输入,但事实并非如此.

看一下舞台描述,这些阶段似乎表明它d是持久的,因为第5阶段有输入,但我仍然对第5阶段甚至存在的原因感到困惑.

Spark UI阶段

Spark UI阶段没有持久化

zer*_*323 1

  1. 输入RDD被缓存并且缓存的部分不会被重新计算。

    这可以通过一个简单的测试来验证:

    import org.apache.spark.SparkContext
    
    def f(sc: SparkContext) = {
      val counter = sc.longAccumulator("counter")
      val rdd = sc.parallelize(0 until 100).map(i => {
        counter.add(1L)
        (i%10, i)
      }).persist
      rdd.join(rdd.reduceByKey(_ + _)).foreach(_ => ())
      counter.value
    }
    
    assert(f(spark.sparkContext) == 100)
    
    Run Code Online (Sandbox Code Playgroud)
  2. 缓存不会从 DAG 中删除阶段。

    如果数据被缓存,相应的阶段可以标记为已跳过,但仍然是 DAG 的一部分。可以使用检查点截断谱系,但这不是同一件事,并且它不会从可视化中删除阶段。

  3. 输入阶段包含的不仅仅是缓存的计算。

    Spark 阶段将操作组合在一起,这些操作可以链接起来而无需执行 shuffle。

    虽然输入阶段的一部分被缓存,但它并没有涵盖准备随机文件所需的所有操作。这就是您看不到跳过的任务的原因。

  4. 其余的(分离)只是图形可视化的限制。

  5. 如果您先重新分区数据:

    import org.apache.spark.HashPartitioner
    
    val d = sc.parallelize(0 until 1000000)
      .map(i => (i%100000, i))
      .partitionBy(new HashPartitioner(20))
    
    d.join(d.reduceByKey(_ + _)).collect
    
    Run Code Online (Sandbox Code Playgroud)

    您将获得您最有可能寻找的 DAG:

    在此输入图像描述