Hoa*_*yen 8 scala user-defined-functions apache-spark
我已经使用 Spark 2.4 一段时间了,最近几天才开始切换到 Spark 3.0。切换到 Spark 3.0 运行后出现此错误udf((x: Int) => x, IntegerType):
Caused by: org.apache.spark.sql.AnalysisException: You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument, and the closure will see the default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. To get rid of this error, you could:
1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`
2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { override def call(s: String): Integer = s.length() }, IntegerType)`, if input types are all non primitive
3. set spark.sql.legacy.allowUntypedScalaUDF to true and use this API with caution;
Run Code Online (Sandbox Code Playgroud)
这些解决方案是 Spark 本身提出的,经过谷歌搜索一段时间后,我到达了 Spark 迁移指南页面:
在 Spark 3.0 中,默认情况下不允许使用 org.apache.spark.sql.functions.udf(AnyRef, DataType)。建议删除返回类型参数以自动切换到类型化 Scala udf,或将spark.sql.legacy.allowUntypedScalaUDF 设置为 true 以继续使用它。在 Spark 2.4 及更低版本中,如果 org.apache.spark.sql.functions.udf(AnyRef, DataType) 获取带有基元类型参数的 Scala 闭包,则如果输入值为 null,则返回的 UDF 将返回 null。但是,在Spark 3.0中,如果输入值为null,UDF将返回Java类型的默认值。例如,val f = udf((x: Int) => x, IntegerType), f($"x") 在 Spark 2.4 及以下版本中,如果 x 列为 null,则返回 null;在 Spark 3.0 中,返回 0。引入此行为更改是因为 Spark 3.0 默认使用 Scala 2.12 构建。
来源:Spark 迁移指南
我注意到我使用function.udfAPI 的常用方式udf(AnyRef, DataType)被称为 ,UnTyped Scala UDF而建议的解决方案udf(AnyRef)被称为Typed Scala UDF。
udf,即(x:Int) => x,显然已经定义了其输入类型,但 Spark 声称You're using untyped Scala UDF, which does not have the input type information?我的理解正确吗?即使经过更深入的搜索,我仍然找不到任何材料来解释什么是 UnTyped Scala UDF 和什么是 Typed Scala UDF。
所以我的问题是:它们是什么?他们有什么区别?
在类型化 scala UDF 中,UDF 知道作为参数传递的列的类型,而在非类型化 scala UDF 中,UDF 不知道作为参数传递的列的类型
创建类型化 scala UDF 时,作为参数传递的列类型和 UDF 的输出是根据函数参数和输出类型推断的,而创建非类型化 scala UDF 时,无论是参数还是输出,根本不存在类型推断。
令人困惑的是,在创建类型化 UDF 时,类型是从函数推断出来的,而不是作为参数显式传递。更明确地说,您可以编写类型化 UDF 创建,如下所示:
val my_typed_udf = udf[Int, Int]((x: Int) => Int)
Run Code Online (Sandbox Code Playgroud)
现在,我们来看看您提出的两点。
据我了解,第一个(例如
udf(AnyRef, DataType))看起来比第二个(例如udf(AnyRef))的类型更严格,其中第一个有显式定义的输出类型,而第二个则没有,因此我对为什么它被称为 UnTyped 感到困惑。
根据Spark Functions scaladocudf ,对于第一个函数,将函数转换为 UDF 的函数签名实际上是:
def udf(f: AnyRef, dataType: DataType): UserDefinedFunction
Run Code Online (Sandbox Code Playgroud)
对于第二个:
def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction
Run Code Online (Sandbox Code Playgroud)
因此,第二个实际上比第一个更具类型,因为第二个考虑了作为参数传递的函数的类型,而第一个则删除了函数的类型。
这就是为什么在第一个中您需要定义返回类型,因为 Spark 需要此信息,但无法从作为参数传递的函数中推断出它,因为它的返回类型被删除,而在第二个中,返回类型是从作为参数传递的函数中推断出来的争论。
该函数还被传递给
udf,即(x:Int) => x,显然已经定义了其输入类型,但 Spark 声称You're using untyped Scala UDF, which does not have the input type information?
这里重要的不是函数,而是 Spark 如何从该函数创建 UDF。
在这两种情况下,要转换为 UDF 的函数都定义了其输入和返回类型,但在使用udf(AnyRef, DataType).