相关疑难解决方法(0)

使用空/空字段值创建新的Dataframe

我正在从现有数据框架创建一个新的Dataframe,但需要在这个新DF中添加新列(下面代码中的"field1").我该怎么办?工作示例代码示例将不胜感激.

val edwDf = omniDataFrame 
  .withColumn("field1", callUDF((value: String) => None)) 
  .withColumn("field2",
    callUdf("devicetypeUDF", (omniDataFrame.col("some_field_in_old_df")))) 

edwDf
  .select("field1", "field2")
  .save("odsoutdatafldr", "com.databricks.spark.csv"); 
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

25
推荐指数
2
解决办法
5万
查看次数

Scala和Spark UDF功能

我做了一个简单的UDF来转换或从spark中的temptabl中的时间字段中提取一些值.我注册了该函数,但是当我使用sql调用该函数时,它会抛出一个NullPointerException.以下是我的功能和执行过程.我正在使用Zeppelin.扼杀这是昨天工作,但它今天早上停止工作.

功能

def convert( time:String ) : String = {
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  return sdf.format(time1)
}
Run Code Online (Sandbox Code Playgroud)

注册功能

sqlContext.udf.register("convert",convert _)
Run Code Online (Sandbox Code Playgroud)

没有SQL测试函数 - 这是有效的

convert(12:12:12) -> returns 12:12
Run Code Online (Sandbox Code Playgroud)

在Zeppelin这个FAILS中用SQL测试函数.

%sql
select convert(time) from temptable limit 10
Run Code Online (Sandbox Code Playgroud)

结构的诱惑力

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- serverip: string (nullable = true)
 |-- request: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- sourceip: …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-zeppelin

10
推荐指数
2
解决办法
2万
查看次数

在Spark数据帧中将所有":"替换为"_"

我正在尝试在Spark数据帧的单个列中替换":" - >"_"的所有实例.我正在尝试这样做:

val url_cleaner = (s:String) => {
   s.replaceAll(":","_")
}
val url_cleaner_udf = udf(url_cleaner)
val df = old_df.withColumn("newCol", url_cleaner_udf(old_df("oldCol")) )
Run Code Online (Sandbox Code Playgroud)

但我一直收到错误:

 SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0 (TID 692, ip-10-81-194-29.ec2.internal): java.lang.NullPointerException
Run Code Online (Sandbox Code Playgroud)

我在udf哪里出错了?

scala user-defined-functions apache-spark spark-dataframe

4
推荐指数
1
解决办法
2万
查看次数

Spark UDF空处理

我正在努力处理在由浮点数结构组成的数据帧(源自配置单元表)上运行的UDF中的空值:

数据框(points)具有以下架构:

root
 |-- point: struct (nullable = true)
 |    |-- x: float (nullable = true)
 |    |-- y: float (nullable = true)
Run Code Online (Sandbox Code Playgroud)

例如,我要计算x和y的总和。请注意,我不“处理”空值在下面的例子,但我希望能够在我的UDF检查是否pointx或者ynull

第一种方法:

val sum = udf((x:Float,y:Float) => x+y)

points.withColumn("sum",sum($"point.x",$"point.y"))
Run Code Online (Sandbox Code Playgroud)

如果该struct点为null,则此方法不起作用,在这种情况下,永远不会评估udf(永远不会执行udf中的代码!),结果为null。另外,我无法检查xy为null,因为Floats在scala中不能为null。

第二种方法:

val sum = udf((pt:Row) => pt.getFloat(0)+pt.getFloat(1))
points.withColumn("sum",sum($"point"))
Run Code Online (Sandbox Code Playgroud)

使用这种方法,我可以pt在udf中检查是否为空,但是我可以检查x并且y因为Floats不能为空。NullPointerException在这种情况下,我得到一个。

如何编写udf win,可以检查struct以及x和y是否为null?

我正在使用spark 1.6.1

更新:与这个问题相反,我在处理浮点数而不是字符串(scala中的字符串可以为null,而浮点数则不是)

scala apache-spark udf

4
推荐指数
1
解决办法
1556
查看次数

如何在 Spark UDF 中使用 Option

我有一个这样的数据集:

+----+------+
|code|status|
+-----------+
|   1| "new"|
|   2|  null|
|   3|  null|
+----+------+
Run Code Online (Sandbox Code Playgroud)

我想编写一个依赖于两列的 UDF。

我按照这个答案中的第二种方法让它工作,即null在 UDF 之外处理,并写入myFn将布尔值作为第二个参数:

df.withColumn("new_column",
  when(pst_regs("status").isNull, 
    myFnUdf($"code", lit(false))
  )
  .otherwise(
    myFnUdf($"code", lit(true))
  )
)
Run Code Online (Sandbox Code Playgroud)

为了在 UDF 中处理 null,我看到的一种方法是根据这个答案讨论“用Options”包装参数。我试过这样的代码:

df.withColumn("new_column", myFnUdf($"code", $"status"))

def myFn(code: Int, status: String) = (code, Option(status)) match {
  case (1, "new") => "1_with_new_status"
  case (2, Some(_)) => "2_with_any_status"
  case (3, None) => "3_no_status"
}
Run Code Online (Sandbox Code Playgroud)

但是一行null给出了type mismatch; found :None.type required …

scala apache-spark

4
推荐指数
1
解决办法
4581
查看次数

如何处理原始可空类型的Spark UDF输入/输出

问题:

1)如果输入是包含null以下内容的原始类型的列,Spark不会调用UDF :

inputDF.show()

+-----+
|  x  |
+-----+
| null|
|  1.0|
+-----+

inputDF
  .withColumn("y",
     udf { (x: Double) => 2.0 }.apply($"x") // will not be invoked if $"x" == null
  )
  .show()

+-----+-----+
|  x  |  y  |
+-----+-----+
| null| null|
|  1.0|  2.0|
+-----+-----+
Run Code Online (Sandbox Code Playgroud)

2)无法null从UDF 生成原始类型的列:

udf { (x: String) => null: Double } // compile error

sql null apache-spark udf

3
推荐指数
1
解决办法
1937
查看次数

Spark数据集是否使用空值减少?

我正在使用以下代码创建数据框:

  val data = List(
    List(444.1235D),
    List(67.5335D),
    List(69.5335D),
    List(677.5335D),
    List(47.5335D),
    List(null)
  )

  val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
  val schema = StructType(Array(
    StructField("value", DataTypes.DoubleType, true)
  ))

  val df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)

然后我将我的udf应用于它:

val multip: Dataset[Double] = df.select(doubleUdf(df("value"))).as[Double]
Run Code Online (Sandbox Code Playgroud)

然后我尝试在此数据集上使用reduce:

val multipl = multip.reduce(_ * _)
Run Code Online (Sandbox Code Playgroud)

在这里我得到0.0的结果.我也尝试过滤掉空值

val multipl = multip.filter(_ != null).reduce(_ * _)
Run Code Online (Sandbox Code Playgroud)

结果相同.如果我从数据中删除null值,一切正常.如何使用空值减少工作量?

我的udf定义如下:

val doubleUdf: UserDefinedFunction = udf((v: Any) => Try(v.toString.toDouble).toOption)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

0
推荐指数
1
解决办法
1238
查看次数