Spark DataFrame:是否保证withColumn的顺序?

Mar*_*ace 1 scala dataframe apache-spark apache-spark-sql

给出以下代码:

dataFrame
    .withColumn("A", myUdf1($"x")) // withColumn1 from x
    .withColumn("B", myUdf2($"y")) // withColumn2 from y
Run Code Online (Sandbox Code Playgroud)

是否保证withColumn1会在之前执行withColumn2

一个更好的例子:

dataFrame
    .withColumn("A", myUdf1($"x")) // withColumn1 from x
    .withColumn("B", myUdf2($"A")) // withColumn2 from A!!
Run Code Online (Sandbox Code Playgroud)

请注意,withColumn2 A是根据计算得出的withColumn1.

我问,因为我在同一代码的多次运行中得到了不一致的结果,我开始认为这可能是问题的根源.

编辑:添加了更详细的代码示例

val result = dataFrame
  .groupBy("key")
  .agg(
    collect_list($"itemList").as("A"),                          // all items
    collect_list(when($"click".isNotNull, $"itemList")).as("B") // subset of A
  )
  // create sparse item vector from all list of items A
  .withColumn("vectorA", aggToSparseUdf($"A"))                  
  // create sparse item vector from all list of items B (subset of A)
  .withColumn("vectorB", aggToSparseUdf($"B"))      
  // calculate ratio vector B / A
  .withColumn("ratio", divideVectors($"vectorB", $"vectorA"))

val keys: Seq[String] = result.head.getAs[Seq[String]]("key")
val values: Seq[SparseVector] = result.head.getAs[Seq[SparseVector]]("ratio")
Run Code Online (Sandbox Code Playgroud)

Tza*_*har 6

  1. IS保证每个特定的记录dataFrame,myUdf1将之前应用myUdf2; 然而:
  2. 保证myUdf1将应用于所有记录dataFrame之前myUdf2被应用到任何记录 -换句话说,myUdf2可能适用于一些记录之前,myUdf1已被应用到其他记录

这是正确的,因为Spark可能将两个操作组合在一起形成一个阶段,并在每个分区的每个记录上执行此阶段(应用myUdf1myUdf2).

如果您的UDF" 纯功能 "或"幂等",或者没有副作用,这不应该造成任何问题- 它们应该是,因为Spark假设所有转换都是这样.如果不是,Spark将无法通过"组合"转换,在不同分区上并行运行转换,重试转换等来优化执行.

编辑:如果要在将UDF2应用到任何记录之前强制 UDF1完全应用,则必须强制它们进入单独的阶段 - 这可以通过重新分区 DataFrame 来完成:

// sample data:
val dataFrame = Seq("A", "V", "D").toDF("x")

// two UDFs with "side effects" (printing to console):
val myUdf1 = udf[String, String](x => {
  println("In UDF1")
  x.toLowerCase
})

val myUdf2 = udf[String, String](x => {
  println("In UDF2")
  x.toUpperCase
})

// repartitioning between UDFs
dataFrame
  .withColumn("A", myUdf1($"x"))
  .repartition(dataFrame.rdd.partitions.length + 1)
  .withColumn("B", myUdf2($"A"))
  .show()

// prints:
// In UDF1
// In UDF1
// In UDF1
// In UDF2
// In UDF2
// In UDF2
Run Code Online (Sandbox Code Playgroud)

请注意,这也不是防弹的 - 例如,如果存在故障和重试,则订单可能再次是非确定性的.