mjf*_*h93 5 scala user-defined-functions apache-spark apache-spark-sql
SPARK_VERSION = 2.2.0
我在尝试对filter具有使用 UDF 添加的列的数据框执行 a 时遇到了一个有趣的问题。我能够用较小的数据集复制问题。
鉴于虚拟案例类:
case class Info(number: Int, color: String)
case class Record(name: String, infos: Seq[Info])
Run Code Online (Sandbox Code Playgroud)
以及以下数据:
val blue = Info(1, "blue")
val black = Info(2, "black")
val yellow = Info(3, "yellow")
val orange = Info(4, "orange")
val white = Info(5, "white")
val a = Record("a", Seq(blue, black, white))
val a2 = Record("a", Seq(yellow, white, orange))
val b = Record("b", Seq(blue, black))
val c = Record("c", Seq(white, orange))
val d = Record("d", Seq(orange, black))
Run Code Online (Sandbox Code Playgroud)
请执行下列操作...
创建两个数据框(我们称它们为 left 和 right)
val left = Seq(a, b).toDF
val right = Seq(a2, c, d).toDF
Run Code Online (Sandbox Code Playgroud)
使用full_outerjoin 连接这些数据框,并仅获取右侧的数据
val rightOnlyInfos = left.alias("l")
.join(right.alias("r"), Seq("name"), "full_outer")
.filter("l.infos is null")
.select($"name", $"r.infos".as("r_infos"))
Run Code Online (Sandbox Code Playgroud)
这导致以下结果:
rightOnlyInfos.show(false)
+----+-----------------------+
|name|r_infos |
+----+-----------------------+
|c |[[5,white], [4,orange]]|
|d |[[4,orange], [2,black]]|
+----+-----------------------+
Run Code Online (Sandbox Code Playgroud)
使用以下 udf,添加一个新列,该列是一个布尔值并表示其中一个是否r_infos包含颜色black
def hasBlack = (s: Seq[Row]) => {
s.exists{ case Row(num: Int, color: String) =>
color == "black"
}
}
val joinedBreakdown = rightOnlyInfos.withColumn("has_black", udf(hasBlack).apply($"r_infos"))
Run Code Online (Sandbox Code Playgroud)
这就是我现在看到问题的地方。如果我执行以下操作,则不会出现任何错误:
joinedBreakdown.show(false)
Run Code Online (Sandbox Code Playgroud)
它的结果(如预期的那样)在:
+----+-----------------------+---------+
|name|r_infos |has_black|
+----+-----------------------+---------+
|c |[[5,white], [4,orange]]|false |
|d |[[4,orange], [2,black]]|true |
+----+-----------------------+---------+
Run Code Online (Sandbox Code Playgroud)
和模式
joinedBreakdown.printSchema
Run Code Online (Sandbox Code Playgroud)
显示
root
|-- name: string (nullable = true)
|-- r_infos: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- number: integer (nullable = false)
| | |-- color: string (nullable = true)
|-- has_black: boolean (nullable = true)
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试按该结果进行过滤时,出现错误:
joinedBreakdown.filter("has_black == true").show(false)
Run Code Online (Sandbox Code Playgroud)
出现以下错误:
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$hasBlack$1: (array<struct<number:int,color:string>>) => boolean)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
at scala.collection.immutable.List.exists(List.scala:84)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:145)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:152)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:150)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:150)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:116)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
at org.apache.spark.sql.Dataset.show(Dataset.scala:646)
at org.apache.spark.sql.Dataset.show(Dataset.scala:623)
... 58 elided
Caused by: java.lang.NullPointerException
at $anonfun$hasBlack$1.apply(<console>:41)
at $anonfun$hasBlack$1.apply(<console>:40)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:92)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
... 114 more
Run Code Online (Sandbox Code Playgroud)
编辑:打开了一个 jira 问题。粘贴在这里用于跟踪目的:https : //issues.apache.org/jira/browse/SPARK-22942
这个答案并没有解决问题存在的原因,而是解决了我找到的解决方案。
我遇到了这样的问题。我不确定原因,但我有两种适合我的解决方法。比我聪明得多的人可能能够向您解释这一切,但以下是我对问题的解决方案。
第一个解决方案
Spark 的行为就像该列尚不存在一样。可能是因为某种过滤器下推。强制 Spark 在过滤之前缓存结果。这使得该列“存在”。
val joinedBreakdown = rightOnlyInfos.withColumn("has_black", hasBlack($"r_infos")).cache()
println(joinedBreakdown.count()) //This will force cache the results from after the UDF has been applied.
joinedBreakdown.filter("has_black == true").show(false)
joinedBreakdown.filter("has_black == true").explain
Run Code Online (Sandbox Code Playgroud)
输出
2
+----+-----------------------+---------+
|name|r_infos |has_black|
+----+-----------------------+---------+
|d |[[4,orange], [2,black]]|true |
+----+-----------------------+---------+
== Physical Plan ==
*Filter (has_black#112632 = true)
+- InMemoryTableScan [name#112622, r_infos#112628, has_black#112632], [(has_black#112632 = true)]
+- InMemoryRelation [name#112622, r_infos#112628, has_black#112632], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [coalesce(name#112606, name#112614) AS name#112622, infos#112615 AS r_infos#112628, UDF(infos#112615) AS has_black#112632]
+- *Filter isnull(infos#112607)
+- SortMergeJoin [name#112606], [name#112614], FullOuter
:- *Sort [name#112606 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#112606, 200)
: +- LocalTableScan [name#112606, infos#112607]
+- *Sort [name#112614 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#112614, 200)
+- LocalTableScan [name#112614, infos#112615]
Run Code Online (Sandbox Code Playgroud)
第二个解决方案
不知道为什么这个有效,但请执行与您相同的操作,除了在 UDF 中添加 try/catch 之外。在我为此大喊大叫之前,请注意使用 try/catch 来控制流程是一种反模式。要了解更多信息,我推荐这个问题和答案。注意:我稍微编辑了你的 UDF,使其看起来像我更熟悉的东西。
def hasBlack = udf((s: Seq[Row]) => {
try{
s.exists{ case Row(num: Int, color: String) =>
color == "black"
}
} catch {
case ex: Exception => false
}
})
val joinedBreakdown = rightOnlyInfos.withColumn("has_black", hasBlack($"r_infos"))
joinedBreakdown.filter("has_black == true").explain
joinedBreakdown.filter("has_black == true").show(false)
Run Code Online (Sandbox Code Playgroud)
输出
== Physical Plan ==
*Project [coalesce(name#112565, name#112573) AS name#112581, infos#112574 AS r_infos#112587, UDF(infos#112574) AS has_black#112591]
+- *Filter isnull(infos#112566)
+- *BroadcastHashJoin [name#112565], [name#112573], RightOuter, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, false]))
: +- *Filter isnotnull(name#112565)
: +- LocalTableScan [name#112565, infos#112566]
+- *Filter (UDF(infos#112574) = true)
+- LocalTableScan [name#112573, infos#112574]
+----+-----------------------+---------+
|name|r_infos |has_black|
+----+-----------------------+---------+
|d |[[4,orange], [2,black]]|true |
+----+-----------------------+---------+
Run Code Online (Sandbox Code Playgroud)
您可以看到,由于我在过滤器之前强制应用 UDF,因此查询计划有所不同。
| 归档时间: |
|
| 查看次数: |
1460 次 |
| 最近记录: |