使用嵌套结构作为输入参数的Spark UDF

0x6*_*C38 2 scala apache-spark

我正在尝试df使用以下数据进行操作:

+---+----------------------------------------------------+
|ka |readingsWFreq                                       |
+---+----------------------------------------------------+
|?  |[[[?,?],220], [[?,??],353], [[?,??],47074]]   |
|?  |[[[?,??],235579]]                                |
Run Code Online (Sandbox Code Playgroud)

以及以下结构:

root
 |-- ka: string (nullable = true)
 |-- readingsWFreq: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- furigana: struct (nullable = true)
 |    |    |    |-- _1: string (nullable = true)
 |    |    |    |-- _2: string (nullable = true)
 |    |    |-- Occ: long (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我的目标是将readingsWFreq的值分为三个不同的列。为此,我尝试udf如下使用:

val uExtractK = udf((kWFreq:Seq[((String, String), Long)]) => kWFreq.map(_._1._1))
val uExtractR = udf((kWFreq:Seq[((String, String), Long)]) => kWFreq.map(_._1._2))
val uExtractN = udf((kWFreq:Seq[((String, String), Long)]) => kWFreq.map(_._2)

val df2 = df.withColumn("K", uExtractK('readingsWFreq))
            .withColumn("R", uExtractR('readingsWFreq))
            .withColumn("N", uExtractN('readingsWFreq))
            .drop('readingsWFreq)
Run Code Online (Sandbox Code Playgroud)

但是,我得到一个与udfs 的输入参数有关的异常:

[error] (run-main-0) org.apache.spark.sql.AnalysisException: cannot resolve
'UDF(readingsWFreq)' due to data type mismatch: argument 1 requires
 array<struct<_1:struct<_1:string,_2:string>,_2:bigint>> type, however, 
'`readingsWFreq`' is of
 array<struct<furigana:struct<_1:string,_2:string>,Occ:bigint>> type.;;
Run Code Online (Sandbox Code Playgroud)

我的问题是,我该如何操纵数据框,以便产生以下结果?

+---+----------------------------------------------------+
|ka |K            |R               |N                    |
+---+----------------------------------------------------+
|?  |[?, ?, ?] | [?, ??, ??] | [220, 353, 47074]   |
|?  |[?]        | [??]          | [235579]            |
Run Code Online (Sandbox Code Playgroud)

Rap*_*oth 6

数据框API方法:

为此,您不需要UDF,只需执行以下操作:

df.select(
  $"readingsWFreq.furigana._1".as("K"),
  $"readingsWFreq.furigana._2".as("R"),
  $"i.Occ".as("N")
)
Run Code Online (Sandbox Code Playgroud)

这里的技巧是.在类型的列上array还充当映射/投影运算符。在类型的列上,struct此运算符用于选择元素。

UDF方法

您不能将元组传递给UDF,而需要将它们作为Rows 传递,请参见例如将Spark UDF与结构序列一起使用

在您的情况下,您有嵌套的元组,因此您需要将行分解两次:

import org.apache.spark.sql.Row


val uExtractK = udf((kWFreq:Seq[Row]) => kWFreq.map(r => r.getAs[Row](0).getAs[String](0)))
val uExtractR = udf((kWFreq:Seq[Row]) => kWFreq.map(r => r.getAs[Row](0).getAs[String](1)))
val uExtractN = udf((kWFreq:Seq[Row]) => kWFreq.map(r => r.getAs[Long](1)))
Run Code Online (Sandbox Code Playgroud)

或在上进行模式匹配Row

val uExtractK = udf((kWFreq:Seq[Row]) => kWFreq.map{case Row(kr:Row,n:Long) => kr match {case Row(k:String,r:String) => k}})
val uExtractR = udf((kWFreq:Seq[Row]) => kWFreq.map{case Row(kr:Row,n:Long) => kr match {case Row(k:String,r:String) => r}})
val uExtractN = udf((kWFreq:Seq[Row]) => kWFreq.map{case Row(kr:Row,n:Long) =>  n})
Run Code Online (Sandbox Code Playgroud)