如何在 Java/Kotlin 中创建一个返回复杂类型的 Spark UDF?

Hex*_*rks 7 java user-defined-functions kotlin apache-spark

我正在尝试编写一个返回复杂类型的 UDF:

private val toPrice = UDF1<String, Map<String, String>> { s ->
    val elements = s.split(" ")
    mapOf("value" to elements[0], "currency" to elements[1])
}


val type = DataTypes.createStructType(listOf(
        DataTypes.createStructField("value", DataTypes.StringType, false),
        DataTypes.createStructField("currency", DataTypes.StringType, false)))
df.sqlContext().udf().register("toPrice", toPrice, type)
Run Code Online (Sandbox Code Playgroud)

但任何时候我使用这个:

df = df.withColumn("price", callUDF("toPrice", col("price")))
Run Code Online (Sandbox Code Playgroud)

我收到一个神秘的错误:

private val toPrice = UDF1<String, Map<String, String>> { s ->
    val elements = s.split(" ")
    mapOf("value" to elements[0], "currency" to elements[1])
}


val type = DataTypes.createStructType(listOf(
        DataTypes.createStructField("value", DataTypes.StringType, false),
        DataTypes.createStructField("currency", DataTypes.StringType, false)))
df.sqlContext().udf().register("toPrice", toPrice, type)
Run Code Online (Sandbox Code Playgroud)

我尝试使用自定义数据类型:

class Price(val value: Double, val currency: String) : Serializable
Run Code Online (Sandbox Code Playgroud)

带有返回该类型的 UDF:

private val toPrice = UDF1<String, Price> { s ->
    val elements = s.split(" ")
    Price(elements[0].toDouble(), elements[1])
}
Run Code Online (Sandbox Code Playgroud)

但后来我得到另一个MatchError抱怨Price类型。

如何正确编写可以返回复杂类型的 UDF?

zer*_*323 9

TL;DR该函数应该返回一个类的对象org.apache.spark.sql.Row

Spark 提供了两种主要的UDF定义变体。

  1. udf 使用 Scala 反射的变体:

    • def udf[RT](f: () ? RT)(implicit arg0: TypeTag[RT]): UserDefinedFunction
    • def udf[RT, A1](f: (A1) ? RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1]): UserDefinedFunction
    • ...
    • def udf[RT, A1, A2, ..., A10](f: (A1, A2, ..., A10) ? RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1], arg2: TypeTag[A2], ..., arg10: TypeTag[A10])

    其中定义

    作为用户定义函数 (UDF) 的 ... 参数的 Scala 闭包。数据类型是根据 Scala 闭包的签名自动推断的。

    这些变体在没有模式的情况下使用原子或代数数据类型。例如,有问题的函数将在 Scala 中定义:

    case class Price(value: Double, currency: String) 
    
    val df = Seq("1 USD").toDF("price")
    
    val toPrice = udf((s: String) => scala.util.Try { 
      s split(" ") match {
        case Array(price, currency) => Price(price.toDouble, currency)
      }
    }.toOption)
    
    df.select(toPrice($"price")).show
    // +----------+
    // |UDF(price)|
    // +----------+
    // |[1.0, USD]|
    // +----------+
    
    Run Code Online (Sandbox Code Playgroud)

    在这个变体中,返回类型是自动编码的。

    由于它依赖于反射,这个变体主要面向 Scala 用户。

  2. udf提供模式定义的变体(您在此处使用的一个)。此变体的返回类型应与 for 相同Dataset[Row]

    提供此变体主要是为了确保 Java 互操作性。

    在这种情况下(相当于所讨论的那个),定义应该类似于以下定义:

    struct<_1:int,_2:struct<_1:string,_2:struct<_1:double,_2:int>>>
    
    Run Code Online (Sandbox Code Playgroud)

    排除异常处理的所有细微差别(通常UDFs应该控制null输入并按照惯例优雅地处理格式错误的数据)Java 等价物应该或多或少像这样:

    Row(1, Row("foo", Row(-1.0, 42))))
    
    Run Code Online (Sandbox Code Playgroud)

上下文

为了给您一些上下文,这种区别也反映在 API 的其他部分中。例如,您可以DataFrame从架构和以下序列创建Rows

def createDataFrame(rows: List[Row], schema: StructType): DataFrame 
Run Code Online (Sandbox Code Playgroud)

或使用反射序列 Products

def createDataFrame[A <: Product](data: Seq[A])(implicit arg0: TypeTag[A]): DataFrame 
Run Code Online (Sandbox Code Playgroud)

但不支持混合变体。

换句话说,您应该提供可以使用RowEncoder.

当然,您通常不会将其udf用于这样的任务:

import org.apache.spark.sql.functions._

df.withColumn("price", struct(
  split($"price", " ")(0).cast("double").alias("price"),
  split($"price", " ")(1).alias("currency")
))
Run Code Online (Sandbox Code Playgroud)

相关


小智 2

很简单。转到数据类型参考并找到相应的类型。

在 Spark 2.3 中

  • 如果您将返回类型声明为StructType函数必须返回org.apache.spark.sql.Row
  • 如果你返回Map<String, String>函数的返回类型应该是MapType- 显然不是你想要的。