fan*_*ndi 10 scala apache-spark apache-spark-sql apache-zeppelin
我做了一个简单的UDF来转换或从spark中的temptabl中的时间字段中提取一些值.我注册了该函数,但是当我使用sql调用该函数时,它会抛出一个NullPointerException.以下是我的功能和执行过程.我正在使用Zeppelin.扼杀这是昨天工作,但它今天早上停止工作.
功能
def convert( time:String ) : String = {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
return sdf.format(time1)
}
Run Code Online (Sandbox Code Playgroud)
注册功能
sqlContext.udf.register("convert",convert _)
Run Code Online (Sandbox Code Playgroud)
没有SQL测试函数 - 这是有效的
convert(12:12:12) -> returns 12:12
Run Code Online (Sandbox Code Playgroud)
在Zeppelin这个FAILS中用SQL测试函数.
%sql
select convert(time) from temptable limit 10
Run Code Online (Sandbox Code Playgroud)
结构的诱惑力
root
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- serverip: string (nullable = true)
|-- request: string (nullable = true)
|-- resource: string (nullable = true)
|-- protocol: integer (nullable = true)
|-- sourceip: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我得到的堆栈跟踪的一部分.
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
Run Code Online (Sandbox Code Playgroud)
Roc*_*ang 13
使用udf而不是直接定义函数
import org.apache.spark.sql.functions._
val convert = udf[String, String](time => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
}
)
Run Code Online (Sandbox Code Playgroud)
udf的输入参数是Column(或Columns).返回类型是Column.
case class UserDefinedFunction protected[sql] (
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {
def apply(exprs: Column*): Column = {
Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
}
}
Run Code Online (Sandbox Code Playgroud)
您必须将函数定义为 UDF。
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
val convertUDF: UserDefinedFunction = udf((time:String) => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
})
Run Code Online (Sandbox Code Playgroud)
接下来,您将在 DataFrame 上应用您的 UDF。
// assuming your DataFrame is already defined
dataFrame.withColumn("time", convertUDF(col("time"))) // using the same name replaces existing
Run Code Online (Sandbox Code Playgroud)
现在,至于您的实际问题,您收到此错误的一个原因可能是您的 DataFrame 包含空行。如果您在应用 UDF 之前将它们过滤掉,您应该能够继续没有问题。
dataFrame.filter(col("time").isNotNull)
Run Code Online (Sandbox Code Playgroud)
我很好奇在运行 UDF 时除了遇到 null 之外还有什么原因导致 NullPointerException,如果您发现与我的建议不同的原因,我很高兴知道。
归档时间: |
|
查看次数: |
18119 次 |
最近记录: |