Spark - 将整行传递给 udf,然后在 udf 中获取列名

use*_*122 4 scala apache-spark

我正在将 Spark 与 Scala 一起使用,并希望将整行传递给 udf 并选择 udf 中的每个列名和列值。我怎样才能做到这一点?

我正在尝试以下 -

inputDataDF.withColumn("errorField", mapCategory(ruleForNullValidation) (col(_*)))

def mapCategory(categories: Map[String, Boolean]) = {
  udf((input:Row) =>  //write a recursive function to check if each row is in categories if yes check for null if null then false, repeat this for all columns and then combine results)   
})
Run Code Online (Sandbox Code Playgroud)

hi-*_*zir 5

在 Spark 1.6 中,您可以Row用作外部类型和struct 表达式。作为表达。列名可以从架构中获取。例如:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, struct}

val df = Seq((1, 2, 3)).toDF("a", "b", "c")
val f = udf((row: Row) => row.schema.fieldNames)
df.select(f(struct(df.columns map col: _*))).show

// +-----------------------------------------------------------------------------+
// |UDF(named_struct(NamePlaceholder, a, NamePlaceholder, b, NamePlaceholder, c))|
// +-----------------------------------------------------------------------------+
// |                                                                    [a, b, c]|
// +-----------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

可以使用Row.getAs方法按名称访问值。