Scala和Spark UDF功能

fan*_*ndi 10 scala apache-spark apache-spark-sql apache-zeppelin

我做了一个简单的UDF来转换或从spark中的temptabl中的时间字段中提取一些值.我注册了该函数,但是当我使用sql调用该函数时,它会抛出一个NullPointerException.以下是我的功能和执行过程.我正在使用Zeppelin.扼杀这是昨天工作,但它今天早上停止工作.

功能

def convert( time:String ) : String = {
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  return sdf.format(time1)
}
Run Code Online (Sandbox Code Playgroud)

注册功能

sqlContext.udf.register("convert",convert _)
Run Code Online (Sandbox Code Playgroud)

没有SQL测试函数 - 这是有效的

convert(12:12:12) -> returns 12:12
Run Code Online (Sandbox Code Playgroud)

在Zeppelin这个FAILS中用SQL测试函数.

%sql
select convert(time) from temptable limit 10
Run Code Online (Sandbox Code Playgroud)

结构的诱惑力

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- serverip: string (nullable = true)
 |-- request: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- sourceip: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我得到的堆栈跟踪的一部分.

java.lang.NullPointerException
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
Run Code Online (Sandbox Code Playgroud)

Roc*_*ang 13

使用udf而不是直接定义函数

import org.apache.spark.sql.functions._

val convert = udf[String, String](time => {
        val sdf = new java.text.SimpleDateFormat("HH:mm")
        val time1 = sdf.parse(time)
        sdf.format(time1)
    }
)
Run Code Online (Sandbox Code Playgroud)

udf的输入参数是Column(或Columns).返回类型是Column.

case class UserDefinedFunction protected[sql] (
    f: AnyRef,
    dataType: DataType,
    inputTypes: Option[Seq[DataType]]) {

  def apply(exprs: Column*): Column = {
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
  }
}
Run Code Online (Sandbox Code Playgroud)


kfk*_*ili 5

您必须将函数定义为 UDF。

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

val convertUDF: UserDefinedFunction = udf((time:String) => {
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  sdf.format(time1)
})
Run Code Online (Sandbox Code Playgroud)

接下来,您将在 DataFrame 上应用您的 UDF。

// assuming your DataFrame is already defined
dataFrame.withColumn("time", convertUDF(col("time"))) // using the same name replaces existing
Run Code Online (Sandbox Code Playgroud)

现在,至于您的实际问题,您收到此错误的一个原因可能是您的 DataFrame 包含空行。如果您在应用 UDF 之前将它们过滤掉,您应该能够继续没有问题。

dataFrame.filter(col("time").isNotNull)
Run Code Online (Sandbox Code Playgroud)

我很好奇在运行 UDF 时除了遇到 null 之外还有什么原因导致 NullPointerException,如果您发现与我的建议不同的原因,我很高兴知道。