我正在从现有数据框架创建一个新的Dataframe,但需要在这个新DF中添加新列(下面代码中的"field1").我该怎么办?工作示例代码示例将不胜感激.
val edwDf = omniDataFrame
.withColumn("field1", callUDF((value: String) => None))
.withColumn("field2",
callUdf("devicetypeUDF", (omniDataFrame.col("some_field_in_old_df"))))
edwDf
.select("field1", "field2")
.save("odsoutdatafldr", "com.databricks.spark.csv");
Run Code Online (Sandbox Code Playgroud) 我做了一个简单的UDF来转换或从spark中的temptabl中的时间字段中提取一些值.我注册了该函数,但是当我使用sql调用该函数时,它会抛出一个NullPointerException.以下是我的功能和执行过程.我正在使用Zeppelin.扼杀这是昨天工作,但它今天早上停止工作.
功能
def convert( time:String ) : String = {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
return sdf.format(time1)
}
Run Code Online (Sandbox Code Playgroud)
注册功能
sqlContext.udf.register("convert",convert _)
Run Code Online (Sandbox Code Playgroud)
没有SQL测试函数 - 这是有效的
convert(12:12:12) -> returns 12:12
Run Code Online (Sandbox Code Playgroud)
在Zeppelin这个FAILS中用SQL测试函数.
%sql
select convert(time) from temptable limit 10
Run Code Online (Sandbox Code Playgroud)
结构的诱惑力
root
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- serverip: string (nullable = true)
|-- request: string (nullable = true)
|-- resource: string (nullable = true)
|-- protocol: integer (nullable = true)
|-- sourceip: …Run Code Online (Sandbox Code Playgroud) 我正在尝试在Spark数据帧的单个列中替换":" - >"_"的所有实例.我正在尝试这样做:
val url_cleaner = (s:String) => {
s.replaceAll(":","_")
}
val url_cleaner_udf = udf(url_cleaner)
val df = old_df.withColumn("newCol", url_cleaner_udf(old_df("oldCol")) )
Run Code Online (Sandbox Code Playgroud)
但我一直收到错误:
SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0 (TID 692, ip-10-81-194-29.ec2.internal): java.lang.NullPointerException
Run Code Online (Sandbox Code Playgroud)
我在udf哪里出错了?
我正在努力处理在由浮点数结构组成的数据帧(源自配置单元表)上运行的UDF中的空值:
数据框(points)具有以下架构:
root
|-- point: struct (nullable = true)
| |-- x: float (nullable = true)
| |-- y: float (nullable = true)
Run Code Online (Sandbox Code Playgroud)
例如,我要计算x和y的总和。请注意,我不“处理”空值在下面的例子,但我希望能够在我的UDF检查是否point,x或者y是null。
第一种方法:
val sum = udf((x:Float,y:Float) => x+y)
points.withColumn("sum",sum($"point.x",$"point.y"))
Run Code Online (Sandbox Code Playgroud)
如果该struct点为null,则此方法不起作用,在这种情况下,永远不会评估udf(永远不会执行udf中的代码!),结果为null。另外,我无法检查x或y为null,因为Floats在scala中不能为null。
第二种方法:
val sum = udf((pt:Row) => pt.getFloat(0)+pt.getFloat(1))
points.withColumn("sum",sum($"point"))
Run Code Online (Sandbox Code Playgroud)
使用这种方法,我可以pt在udf中检查是否为空,但是我可以检查x并且y因为Floats不能为空。NullPointerException在这种情况下,我得到一个。
如何编写udf win,可以检查struct以及x和y是否为null?
我正在使用spark 1.6.1
更新:与这个问题相反,我在处理浮点数而不是字符串(scala中的字符串可以为null,而浮点数则不是)
我有一个这样的数据集:
+----+------+
|code|status|
+-----------+
| 1| "new"|
| 2| null|
| 3| null|
+----+------+
Run Code Online (Sandbox Code Playgroud)
我想编写一个依赖于两列的 UDF。
我按照这个答案中的第二种方法让它工作,即null在 UDF 之外处理,并写入myFn将布尔值作为第二个参数:
df.withColumn("new_column",
when(pst_regs("status").isNull,
myFnUdf($"code", lit(false))
)
.otherwise(
myFnUdf($"code", lit(true))
)
)
Run Code Online (Sandbox Code Playgroud)
为了在 UDF 中处理 null,我看到的一种方法是根据这个答案讨论“用Options”包装参数。我试过这样的代码:
df.withColumn("new_column", myFnUdf($"code", $"status"))
def myFn(code: Int, status: String) = (code, Option(status)) match {
case (1, "new") => "1_with_new_status"
case (2, Some(_)) => "2_with_any_status"
case (3, None) => "3_no_status"
}
Run Code Online (Sandbox Code Playgroud)
但是一行null给出了type mismatch; found :None.type required …
问题:
1)如果输入是包含null以下内容的原始类型的列,Spark不会调用UDF :
inputDF.show()
+-----+
| x |
+-----+
| null|
| 1.0|
+-----+
inputDF
.withColumn("y",
udf { (x: Double) => 2.0 }.apply($"x") // will not be invoked if $"x" == null
)
.show()
+-----+-----+
| x | y |
+-----+-----+
| null| null|
| 1.0| 2.0|
+-----+-----+
Run Code Online (Sandbox Code Playgroud)
2)无法null从UDF 生成原始类型的列:
udf { (x: String) => null: Double } // compile error
我正在使用以下代码创建数据框:
val data = List(
List(444.1235D),
List(67.5335D),
List(69.5335D),
List(677.5335D),
List(47.5335D),
List(null)
)
val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
val schema = StructType(Array(
StructField("value", DataTypes.DoubleType, true)
))
val df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)
然后我将我的udf应用于它:
val multip: Dataset[Double] = df.select(doubleUdf(df("value"))).as[Double]
Run Code Online (Sandbox Code Playgroud)
然后我尝试在此数据集上使用reduce:
val multipl = multip.reduce(_ * _)
Run Code Online (Sandbox Code Playgroud)
在这里我得到0.0的结果.我也尝试过滤掉空值
val multipl = multip.filter(_ != null).reduce(_ * _)
Run Code Online (Sandbox Code Playgroud)
结果相同.如果我从数据中删除null值,一切正常.如何使用空值减少工作量?
我的udf定义如下:
val doubleUdf: UserDefinedFunction = udf((v: Any) => Try(v.toString.toDouble).toOption)
Run Code Online (Sandbox Code Playgroud)