J *_*ath 36 scala dataframe apache-spark
我正试图在Spark中找出新的数据帧API.看起来好像是向前迈出了一大步,但却做了一件非常简单的事情.我有一个包含2列的数据框,"ID"和"Amount".作为一个通用示例,假设我想返回一个名为"code"的新列,该列返回基于"Amt"值的代码.我可以写一个像这样的函数:
def coder(myAmt:Integer):String {
if (myAmt > 100) "Little"
else "Big"
}
Run Code Online (Sandbox Code Playgroud)
当我尝试使用它时:
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
myDF.withColumn("Code", coder(myDF("Amt")))
Run Code Online (Sandbox Code Playgroud)
我得到类型不匹配错误
found : org.apache.spark.sql.Column
required: Integer
Run Code Online (Sandbox Code Playgroud)
我已经尝试将我的函数的输入类型更改为org.apache.spark.sql.Column但是我随后在函数编译时开始得到错误,因为它在if语句中需要一个布尔值.
我做错了吗?有没有比使用withColumn更好/另一种方法?
谢谢你的帮助.
yjs*_*hen 54
假设您的架构中有"Amt"列:
import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))
Run Code Online (Sandbox Code Playgroud)
我认为withColumn是添加列的正确方法
Ram*_*jan 11
我们应该避免udf由于它的列serialization和deserialization列的开销而尽可能多地定义函数.
您可以通过以下简单的when火花功能实现解决方案
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
61185 次 |
| 最近记录: |