Hex*_*rks 7 java user-defined-functions kotlin apache-spark
我正在尝试编写一个返回复杂类型的 UDF:
private val toPrice = UDF1<String, Map<String, String>> { s ->
val elements = s.split(" ")
mapOf("value" to elements[0], "currency" to elements[1])
}
val type = DataTypes.createStructType(listOf(
DataTypes.createStructField("value", DataTypes.StringType, false),
DataTypes.createStructField("currency", DataTypes.StringType, false)))
df.sqlContext().udf().register("toPrice", toPrice, type)
Run Code Online (Sandbox Code Playgroud)
但任何时候我使用这个:
df = df.withColumn("price", callUDF("toPrice", col("price")))
Run Code Online (Sandbox Code Playgroud)
我收到一个神秘的错误:
private val toPrice = UDF1<String, Map<String, String>> { s ->
val elements = s.split(" ")
mapOf("value" to elements[0], "currency" to elements[1])
}
val type = DataTypes.createStructType(listOf(
DataTypes.createStructField("value", DataTypes.StringType, false),
DataTypes.createStructField("currency", DataTypes.StringType, false)))
df.sqlContext().udf().register("toPrice", toPrice, type)
Run Code Online (Sandbox Code Playgroud)
我尝试使用自定义数据类型:
class Price(val value: Double, val currency: String) : Serializable
Run Code Online (Sandbox Code Playgroud)
带有返回该类型的 UDF:
private val toPrice = UDF1<String, Price> { s ->
val elements = s.split(" ")
Price(elements[0].toDouble(), elements[1])
}
Run Code Online (Sandbox Code Playgroud)
但后来我得到另一个MatchError抱怨Price类型。
如何正确编写可以返回复杂类型的 UDF?
TL;DR该函数应该返回一个类的对象org.apache.spark.sql.Row。
Spark 提供了两种主要的UDF定义变体。
udf 使用 Scala 反射的变体:
def udf[RT](f: () ? RT)(implicit arg0: TypeTag[RT]): UserDefinedFunctiondef udf[RT, A1](f: (A1) ? RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1]): UserDefinedFunctiondef udf[RT, A1, A2, ..., A10](f: (A1, A2, ..., A10) ? RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1], arg2: TypeTag[A2], ..., arg10: TypeTag[A10])其中定义
作为用户定义函数 (UDF) 的 ... 参数的 Scala 闭包。数据类型是根据 Scala 闭包的签名自动推断的。
这些变体在没有模式的情况下使用原子或代数数据类型。例如,有问题的函数将在 Scala 中定义:
case class Price(value: Double, currency: String)
val df = Seq("1 USD").toDF("price")
val toPrice = udf((s: String) => scala.util.Try {
s split(" ") match {
case Array(price, currency) => Price(price.toDouble, currency)
}
}.toOption)
df.select(toPrice($"price")).show
// +----------+
// |UDF(price)|
// +----------+
// |[1.0, USD]|
// +----------+
Run Code Online (Sandbox Code Playgroud)
在这个变体中,返回类型是自动编码的。
由于它依赖于反射,这个变体主要面向 Scala 用户。
udf提供模式定义的变体(您在此处使用的一个)。此变体的返回类型应与 for 相同Dataset[Row]:
正如另一个答案中所指出的,您只能使用SQL 类型映射表中列出的类型(装箱或未装箱的原子类型,java.sql.Timestamp/java.sql.Date以及高级集合)。
复杂结构 ( structs/ StructTypes) 使用org.apache.spark.sql.Row. 不允许与代数数据类型或等效数据混合。例如(Scala 代码)
struct<_1:int,_2:struct<_1:string,_2:struct<_1:double,_2:int>>>
Run Code Online (Sandbox Code Playgroud)
应表示为
Row(1, Row("foo", Row(-1.0, 42))))
Run Code Online (Sandbox Code Playgroud)
不是
(1, ("foo", (-1.0, 42))))
Run Code Online (Sandbox Code Playgroud)
或任何混合变体,例如
Row(1, Row("foo", (-1.0, 42))))
Run Code Online (Sandbox Code Playgroud)
提供此变体主要是为了确保 Java 互操作性。
在这种情况下(相当于所讨论的那个),定义应该类似于以下定义:
struct<_1:int,_2:struct<_1:string,_2:struct<_1:double,_2:int>>>
Run Code Online (Sandbox Code Playgroud)
排除异常处理的所有细微差别(通常UDFs应该控制null输入并按照惯例优雅地处理格式错误的数据)Java 等价物应该或多或少像这样:
Row(1, Row("foo", Row(-1.0, 42))))
Run Code Online (Sandbox Code Playgroud)上下文:
为了给您一些上下文,这种区别也反映在 API 的其他部分中。例如,您可以DataFrame从架构和以下序列创建Rows:
def createDataFrame(rows: List[Row], schema: StructType): DataFrame
Run Code Online (Sandbox Code Playgroud)
或使用反射序列 Products
def createDataFrame[A <: Product](data: Seq[A])(implicit arg0: TypeTag[A]): DataFrame
Run Code Online (Sandbox Code Playgroud)
但不支持混合变体。
换句话说,您应该提供可以使用RowEncoder.
当然,您通常不会将其udf用于这样的任务:
import org.apache.spark.sql.functions._
df.withColumn("price", struct(
split($"price", " ")(0).cast("double").alias("price"),
split($"price", " ")(1).alias("currency")
))
Run Code Online (Sandbox Code Playgroud)
相关:
| 归档时间: |
|
| 查看次数: |
4797 次 |
| 最近记录: |