定义一个接受Spark DataFrame中的对象数组的UDF?

ohr*_*uus 26 scala user-defined-functions dataframe apache-spark apache-spark-sql

使用Spark的DataFrame时,需要使用用户定义函数(UDF)来映射列中的数据.UDF要求显式指定参数类型.在我的情况下,我需要操作由对象数组组成的列,我不知道要使用什么类型.这是一个例子:

import sqlContext.implicits._

// Start with some data. Each row (here, there's only one row) 
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
  """
  |{
  |  "topic" : "pets",
  |  "subjects" : [
  |    {"type" : "cat", "score" : 10},
  |    {"type" : "dog", "score" : 1}
  |  ]
  |}
  """)))
Run Code Online (Sandbox Code Playgroud)

使用内置org.apache.spark.sql.functions函数对列中的数据执行基本操作相对简单

import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show

+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets|             2|
+-----+--------------+
Run Code Online (Sandbox Code Playgroud)

并且通常很容易编写自定义UDF来执行任意操作

import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show 

+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
|      PETS|             2|
+----------+--------------+
Run Code Online (Sandbox Code Playgroud)

但是,如果我想使用UDF来操作"subject"列中的对象数组呢?我在UDF中使用什么类型的参数?例如,如果我想重新实现size函数,而不是使用spark提供的函数:

val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show
Run Code Online (Sandbox Code Playgroud)

显然Array[Something]不起作用......我应该使用什么类型!?我应该Array[]完全放弃吗?四处scala.collection.mutable.WrappedArray寻找告诉我可能与它有关,但仍然有我需要提供的另一种类型.

zer*_*323 24

你在寻找的是Seq[o.a.s.sql.Row]:

import org.apache.spark.sql.Row

val my_size = udf { subjects: Seq[Row] => subjects.size }
Run Code Online (Sandbox Code Playgroud)

说明:

  • 当前表现ArrayType是,因为你已经知道了,WrappedArray所以Array将无法正常工作,这是更好地留在安全方面.
  • 根据官方规格,局部(外部)类型StructTypeRow.不幸的是,这意味着对各个字段的访问不是类型安全的.

备注:

  • struct在Spark <2.3中创建,传递给的函数udf必须返回Product类型(Tuple*case class),而不是Row.那是因为相应的udf变体依赖于Scala反射:

    n个参数的Scala闭包定义为用户定义的函数(UDF).根据Scala闭包的签名自动推断数据类型.

  • 在Spark> = 2.3中Row,只要提供了模式,就可以直接返回.

    def udf(f: AnyRef, dataType: DataType): UserDefinedFunction 使用Scala闭包定义确定性用户定义函数(UDF).对于此变体,调用者必须指定输出数据类型,并且没有自动输入类型强制.

    请参阅例如如何在Java/Kotlin中创建一个返回复杂类型的Spark UDF?.