如何知道(广播)连接查询中的Spark作业和阶段的数量?

suj*_*jit 8 apache-spark apache-spark-sql

我使用Spark 2.1.2.

我试图了解各种火花UI选项卡显示作为一个工作运行.我使用spark-shell --master local并执行以下join查询:

val df = Seq(
  (55, "Canada", -1, "", 0),
  (77, "Ontario", 55, "/55", 1),
  (100, "Toronto", 77, "/55/77", 2),
  (104, "Brampton", 100, "/55/77/100", 3)
).toDF("id", "name", "parentId", "path", "depth")

val dfWithPar = df.as("df1").
  join(df.as("df2"), $"df1.parentId" === $"df2.Id", "leftouter").
  select($"df1.*", $"df2.name" as "parentName")

dfWithPar.show
Run Code Online (Sandbox Code Playgroud)

这是物理查询计划:

== Physical Plan ==
*Project [Id#11, name#12, parentId#13, path#14, depth#15, name#25 AS parentName#63]
+- *BroadcastHashJoin [parentId#13], [Id#24], LeftOuter, BuildRight
   :- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- LocalTableScan [Id#24, name#25]
Run Code Online (Sandbox Code Playgroud)

我有两个关于查询执行的问题.

  1. 为什么查询有两个作业?

    火花工作视图

  2. 为什么两个作业的舞台视图相同?下面是作业ID 1的阶段视图的屏幕截图,它与作业ID 0完全相同.为什么?

    阶段1的阶段视图与阶段0完全相同

Jac*_*ski 13

我使用Spark 2.3.0来回答你的问题(实际上是2.3.1-SNAPSHOT),因为它是本文撰写时的最新版本.由于2.1.2和我的2.3.0中的物理查询计划完全相同(除了圆括号中的每个查询codegen阶段ID),因此查询执行(如果有任何重要)的变化很小.


dfWithPar.show结构化查询(你使用Scala的星火SQL的数据集API构建的)进行了优化,下面的物理查询计划(我包括在我的答案更好的理解).

scala> dfWithPar.explain
== Physical Plan ==
*(1) Project [Id#11, name#12, parentId#13, path#14, depth#15, name#24 AS parentName#58]
+- *(1) BroadcastHashJoin [parentId#13], [Id#23], LeftOuter, BuildRight
   :- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [Id#23, name#24]
Run Code Online (Sandbox Code Playgroud)

Spark职位数量

为什么查询有两个作业?

我会说甚至还有三个Spark工作.

Web UI中的广播连接查询的Spark作业

tl; dr One Spark工作用于BroadcastHashJoinExec物理操作员,而另外两个用于Dataset.show.

为了理解查询执行和结构化查询的Spark作业数,理解结构化查询(使用数据集API描述)和RDD API之间的区别非常重要.

Spark SQL的数据集和Spark Core的RDD都描述了Spark中的分布式计算.RDD是Spark"汇编程序"语言(类似于JVM字节码),而数据集是使用类似SQL语言的结构化查询的更高级别描述(类似于Scala或Java等JVM语言,与我之前使用的JVM字节码相比).

重要的是,使用数据集API的结构化查询最终会最终成为基于RDD的分布式计算(可以将其与Java或Scala编译器如何将高级语言转换为JVM字节码进行比较).

数据集API是对RDD API的抽象,当您在DataFrame或数据集上调用该操作时,该操作会将其转换为RDD.

有了这个,你不应该对Dataset.show最终调用RDD操作感到惊讶, RDD操作又将运行零个,一个或多个Spark作业.

Dataset.show(numRows默认情况下等于20)最后调用showString,取(numRows + 1)得到一个Array[Row].

val takeResult = newDf.select(castCols: _*).take(numRows + 1)
Run Code Online (Sandbox Code Playgroud)

换句话说,dfWithPar.show()就Spark工作的数量而言,相当于dfWithPar.take(21)其等价dfWithPar.head(21).

您可以在SQL选项卡中查看它们及其作业数.它们都应该是平等的.

Web UI中的SQL选项卡

showtakehead所有导致collectFromPlan触发火花的作业(通过调用executeCollect).

您应该确定回答关于作业数量的问题是了解查询中的所有物理运算符是如何工作的.您只需要了解它们在运行时的行为以及它们是否会触发Spark作业.

在此输入图像描述

BroadcastHashJoin和BroadcastExchangeExec物理运营商

BroadcastHashJoinExec二进制物理运算时使用的连接的右侧可以被广播(精确地spark.sql.autoBroadcastJoinThreshold说是10M通过默认值).

BroadcastExchangeExec一元物理运算符用于向工作节点(支持BroadcastHashJoinExec)广播行(关系).

BroadcastHashJoinExec被执行(以产生RDD[InternalRow]),它创建了一个广播变量,反过来执行BroadcastExchangeExec(在一个单独的线程).

这就是运行ThreadPoolExecutor.java:1149 Spark作业0 的原因.

如果执行以下操作,您可以看到单个Spark作业0已运行:

// Just a single Spark job for the broadcast variable
val r = dfWithPar.rdd
Run Code Online (Sandbox Code Playgroud)

这要求执行结构化查询以生成RDD,然后该RDD是给出最终结果的动作的目标.

在此输入图像描述

如果你没有结束广播连接查询,你就不会有Spark工作.

RDD.take运算符

我错过了最初的那一刻,我回答的问题是,该数据集的运营商,即show,take并且head,将最终导致RDD.take.

take(num:Int):Array [T]获取RDD的前几个num元素.它的工作原理是首先扫描一个分区,然后使用该分区的结果来估计满足限制所需的其他分区数.

请注意,如果take"它的工作原理是先扫描一个分区,然后使用结果从分区来估算,以满足极限需要额外的分区数目." 这是了解广播连接查询中Spark作业数量的关键.

每次迭代(在上面的描述中)都是一个单独的Spark作业,从第一个分区开始,每次后续迭代的次数是4次:

// RDD.take
def take(num: Int): Array[T] = withScope {
  ...
  while (buf.size < num && partsScanned < totalParts) {
    ...
    val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
    ...
  }
}
Run Code Online (Sandbox Code Playgroud)

看看以下RDD.take21行.

// The other two Spark jobs
r.take(21)
Run Code Online (Sandbox Code Playgroud)

您将在查询中获得2个Spark作业.

在此输入图像描述

猜猜执行时你将拥有多少个Spark工作dfWithPar.show(1).

为什么阶段是一样的?

为什么两个作业的舞台视图相同?下面是作业ID 1的阶段视图的屏幕截图,它与作业ID 0完全相同.为什么?

这很容易回答,因为两个Spark工作都来自RDD.take(20).

第一个Spark作业是扫描第一个分区,因为没有足够的行导致另一个Spark作业扫描更多分区.