使用Spark Dataframe中的函数创建新列

J *_*ath 36 scala dataframe apache-spark

我正试图在Spark中找出新的数据帧API.看起来好像是向前迈出了一大步,但却做了一件非常简单的事情.我有一个包含2列的数据框,"ID"和"Amount".作为一个通用示例,假设我想返回一个名为"code"的新列,该列返回基于"Amt"值的代码.我可以写一个像这样的函数:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}
Run Code Online (Sandbox Code Playgroud)

当我尝试使用它时:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", coder(myDF("Amt")))
Run Code Online (Sandbox Code Playgroud)

我得到类型不匹配错误

found   : org.apache.spark.sql.Column
required: Integer
Run Code Online (Sandbox Code Playgroud)

我已经尝试将我的函数的输入类型更改为org.apache.spark.sql.Column但是我随后在函数编译时开始得到错误,因为它在if语句中需要一个布尔值.

我做错了吗?有没有比使用withColumn更好/另一种方法?

谢谢你的帮助.

yjs*_*hen 54

假设您的架构中有"Amt"列:

import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))
Run Code Online (Sandbox Code Playgroud)

我认为withColumn是添加列的正确方法

  • 我从来没有像你上面那样用scala写过这样的函数.通过扩展,如果我有一个带有多个参数的更复杂的函数,我会写类似:val coder:((Int,Int)=> String)=(arg1:Int,arg2:Int)=> {if(arg1 < 100 && arg2 <100 ....? (2认同)
  • 我认为在 myDF.withColumn("Code", sqlfunc(col("Amt"))) 中,需要使用数据框指定列数据,即 myDF.withColumn("Code", sqlfunc(myDF.col("Amt") )) (2认同)

Ram*_*jan 11

我们应该避免udf由于它的列serializationdeserialization列的开销而尽可能多地定义函数.

您可以通过以下简单的when火花功能实现解决方案

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))
Run Code Online (Sandbox Code Playgroud)