对Spark Dataframe的val vs def性能

the*_*tom 1 scala apache-spark

下面的代码,因此有关性能的问题 - 当然是大规模的想象:

import org.apache.spark.sql.types.StructType

val df = sc.parallelize(Seq(
   ("r1", 1, 1),
   ("r2", 6, 4),
   ("r3", 4, 1),
   ("r4", 1, 2)
   )).toDF("ID", "a", "b")

val ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

// or

def ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

df.withColumn("ones", ones).explain
Run Code Online (Sandbox Code Playgroud)

这里有两个使用def和val的物理计划 - 它们是相同的:

 == Physical Plan == **def**
 *(1) Project [_1#760 AS ID#764, _2#761 AS a#765, _3#762 AS b#766, (CASE WHEN (_2#761 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#762 = 1) THEN 1 ELSE 0 END) AS ones#770]
 +- *(1) SerializeFromObject [staticinvoke(class 
 org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#760, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#761, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#762]
   +- Scan[obj#759]


 == Physical Plan == **val**
 *(1) Project [_1#780 AS ID#784, _2#781 AS a#785, _3#782 AS b#786, (CASE WHEN (_2#781 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#782 = 1) THEN 1 ELSE 0 END) AS ones#790]
 +- *(1) SerializeFromObject [staticinvoke(class 
 org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#780, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#781, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#782]
    +- Scan[obj#779] 
Run Code Online (Sandbox Code Playgroud)

所以,有讨论:

val vs def表现.

然后:

  • 我认为.explains没有区别.好.

  • 来自其他地方:val在定义时评估,def-在被调用时.

  • 我假设在这里使用val或def没有区别,因为它基本上在一个循环中并且有一个减少.它是否正确?
  • df.schema.map(C => c.name).drop(1)被每数据帧行执行?当然没有必要.Catalyst会优化这个吗?
  • 如果上述情况是正确的,每次执行语句都要执行,那么我们怎样才能使这段代码只出现一次?我们应该制作val的val = df.schema.map(c => c.name).drop(1)
  • val,def比Scala更多,也是Spark组件.

对于-1er我问这个因为以下是非常明确的,但是val的代码比下面的代码更多,而且下面没有迭代:

var x = 2 // using var as I need to change it to 3 later
val sq = x*x // evaluates right now
x = 3 // no effect! sq is already evaluated
println(sq)
Run Code Online (Sandbox Code Playgroud)

Yuv*_*kov 5

这里有两个核心概念,Spark DAG创建和评估,以及Scala val和vs def定义,这些是正交的

我认为.explains没有区别

您没有看到任何区别,因为从Spark的角度来看,查询是相同的.如果您将图形存储在a中,val或者每次使用a创建它,则与分析器无关def.

来自其他地方:val在定义时评估,def-在被调用时.

这是Scala语义.A val是一个不可变引用,它在声明站点进行一次评估.A def代表方法定义,如果在其中分配新DataFrame内容,则每次调用它时都会创建一个.例如:

def ones = 
  df
   .schema
   .map(c => c.name)
   .drop(1)
   .map(x => when(col(x) === 1, 1).otherwise(0))
   .reduce(_ + _)

val firstcall = ones
val secondCall = ones
Run Code Online (Sandbox Code Playgroud)

上面的代码将在DF上构建两个独立的DAG.

我假设在这里使用val或def没有区别,因为它基本上在一个循环中并且有一个减少.它是否正确?

我不确定你在谈论哪个循环,但请看上面的答案,区分两者.

每个数据帧行都会执行df.schema.map(c => c.name).drop(1)吗?当然没有必要.Catalyst会优化这个吗?

不会,drop(1)整个数据框都会发生,这实质上会使它只丢弃第一行.

如果上述情况是正确的,每次执行语句都要执行,那么我们怎样才能使这段代码只出现一次?我们应该制作val的val = df.schema.map(c => c.name).drop(1)

每个数据帧只发生一次(在您的示例中我们只有一个).