use*_*122 4 scala apache-spark
我正在将 Spark 与 Scala 一起使用,并希望将整行传递给 udf 并选择 udf 中的每个列名和列值。我怎样才能做到这一点?
我正在尝试以下 -
inputDataDF.withColumn("errorField", mapCategory(ruleForNullValidation) (col(_*)))
def mapCategory(categories: Map[String, Boolean]) = {
udf((input:Row) => //write a recursive function to check if each row is in categories if yes check for null if null then false, repeat this for all columns and then combine results)
})
Run Code Online (Sandbox Code Playgroud)
在 Spark 1.6 中,您可以Row用作外部类型和struct 表达式。作为表达。列名可以从架构中获取。例如:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, struct}
val df = Seq((1, 2, 3)).toDF("a", "b", "c")
val f = udf((row: Row) => row.schema.fieldNames)
df.select(f(struct(df.columns map col: _*))).show
// +-----------------------------------------------------------------------------+
// |UDF(named_struct(NamePlaceholder, a, NamePlaceholder, b, NamePlaceholder, c))|
// +-----------------------------------------------------------------------------+
// | [a, b, c]|
// +-----------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
可以使用Row.getAs方法按名称访问值。
| 归档时间: |
|
| 查看次数: |
6043 次 |
| 最近记录: |