Mar*_*ace 1 scala dataframe apache-spark apache-spark-sql
给出以下代码:
dataFrame
.withColumn("A", myUdf1($"x")) // withColumn1 from x
.withColumn("B", myUdf2($"y")) // withColumn2 from y
Run Code Online (Sandbox Code Playgroud)
是否保证withColumn1会在之前执行withColumn2?
一个更好的例子:
dataFrame
.withColumn("A", myUdf1($"x")) // withColumn1 from x
.withColumn("B", myUdf2($"A")) // withColumn2 from A!!
Run Code Online (Sandbox Code Playgroud)
请注意,withColumn2 A是根据计算得出的withColumn1.
我问,因为我在同一代码的多次运行中得到了不一致的结果,我开始认为这可能是问题的根源.
编辑:添加了更详细的代码示例
val result = dataFrame
.groupBy("key")
.agg(
collect_list($"itemList").as("A"), // all items
collect_list(when($"click".isNotNull, $"itemList")).as("B") // subset of A
)
// create sparse item vector from all list of items A
.withColumn("vectorA", aggToSparseUdf($"A"))
// create sparse item vector from all list of items B (subset of A)
.withColumn("vectorB", aggToSparseUdf($"B"))
// calculate ratio vector B / A
.withColumn("ratio", divideVectors($"vectorB", $"vectorA"))
val keys: Seq[String] = result.head.getAs[Seq[String]]("key")
val values: Seq[SparseVector] = result.head.getAs[Seq[SparseVector]]("ratio")
Run Code Online (Sandbox Code Playgroud)
dataFrame,myUdf1将之前应用myUdf2; 然而:myUdf1将应用于所有记录的dataFrame之前myUdf2被应用到任何记录 -换句话说,myUdf2可能适用于一些记录之前,myUdf1已被应用到其他记录这是正确的,因为Spark可能将两个操作组合在一起形成一个阶段,并在每个分区的每个记录上执行此阶段(应用myUdf1和myUdf2).
如果您的UDF" 纯功能 "或"幂等",或者没有副作用,这不应该造成任何问题- 它们应该是,因为Spark假设所有转换都是这样.如果不是,Spark将无法通过"组合"转换,在不同分区上并行运行转换,重试转换等来优化执行.
编辑:如果要在将UDF2应用到任何记录之前强制 UDF1完全应用,则必须强制它们进入单独的阶段 - 这可以通过重新分区 DataFrame 来完成:
// sample data:
val dataFrame = Seq("A", "V", "D").toDF("x")
// two UDFs with "side effects" (printing to console):
val myUdf1 = udf[String, String](x => {
println("In UDF1")
x.toLowerCase
})
val myUdf2 = udf[String, String](x => {
println("In UDF2")
x.toUpperCase
})
// repartitioning between UDFs
dataFrame
.withColumn("A", myUdf1($"x"))
.repartition(dataFrame.rdd.partitions.length + 1)
.withColumn("B", myUdf2($"A"))
.show()
// prints:
// In UDF1
// In UDF1
// In UDF1
// In UDF2
// In UDF2
// In UDF2
Run Code Online (Sandbox Code Playgroud)
请注意,这也不是防弹的 - 例如,如果存在故障和重试,则订单可能再次是非确定性的.
| 归档时间: |
|
| 查看次数: |
896 次 |
| 最近记录: |