将可空列作为参数传递给Spark SQL UDF

Sud*_*adi 6 apache-spark apache-spark-sql

这是一个Spark UDF,我用它来计算使用少量列的值.

def spark_udf_func(s: String, i:Int): Boolean = { 
    // I'm returning true regardless of the parameters passed to it.
    true
}

val spark_udf = org.apache.spark.sql.functions.udf(spark_udf_func _)

val df = sc.parallelize(Array[(Option[String], Option[Int])](
  (Some("Rafferty"), Some(31)), 
  (null, Some(33)), 
  (Some("Heisenberg"), Some(33)),  
  (Some("Williams"), null)
)).toDF("LastName", "DepartmentID")

df.withColumn("valid", spark_udf(df.col("LastName"), df.col("DepartmentID"))).show()
Run Code Online (Sandbox Code Playgroud)
+----------+------------+-----+
|  LastName|DepartmentID|valid|
+----------+------------+-----+
|  Rafferty|          31| true|
|      null|          33| true|
|Heisenberg|          33| true|
|  Williams|        null| null|
+----------+------------+-----+
Run Code Online (Sandbox Code Playgroud)

任何人都可以解释为什么最后一行的列有效值为null?

当我检查了火花计划时,我能够发现该计划有一个案例条件,它说如果column2(DepartmentID)为null,则必须返回null.

== Physical Plan ==

*Project [_1#699 AS LastName#702, _2#700 AS DepartmentID#703, if (isnull(_2#700)) null else UDF(_1#699, _2#700) AS valid#717]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), assertnotnull(input[0, scala.Tuple2, true])._1), true) AS _1#699, unwrapoption(IntegerType, assertnotnull(input[0, scala.Tuple2, true])._2) AS _2#700]
   +- Scan ExternalRDDScan[obj#698]
Run Code Online (Sandbox Code Playgroud)

为什么我们在Spark中有这样的行为?
为什么只有Integer列?
我在这里做错了什么,当UDF参数为null时,在UDF中处理null的正确方法是什么?

Ass*_*son 7

问题是null不是scala Int的有效值(它是支持值),而它是String的有效值.Int等同于java int原语,并且必须具有值.这意味着当值为null时无法调用udf,因此保留null.

有两种方法可以解决这个问题:

  1. 更改函数以接受java.lang.Integer(这是一个对象,可以为null)
  2. 如果你不能改变这个功能,你可以使用when /否则在null的情况下做一些特别的事情.例如when(col("int col").isNull,someValue).otherwise(原始调用)

可以在这里找到对此的一个很好的解释

  • 还有第三个选项可以让您坚持使用 Scala Int:将参数打包在一个结构中(使用 `df.withColumn("valid", spark_udf(struct(df.col("LastName"), df.col(") DepartmentID"))))`) 并使用 `Row` 作为 udf 的输入参数。在 udf 中,您可以使用 `row.isNullAt(i: Int)` 检查该行的空值 (2认同)