hp2*_*326 5 struct apache-spark udf
我试图将一个struct in spark传递给udf.它正在更改字段名称并重命名为列位置.我如何解决它?
object TestCSV {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("localTest").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val inputData = sqlContext.read.format("com.databricks.spark.csv")
.option("delimiter","|")
.option("header", "true")
.load("test.csv")
inputData.printSchema()
inputData.show()
val groupedData = inputData.withColumn("name",struct(inputData("firstname"),inputData("lastname")))
val udfApply = groupedData.withColumn("newName",processName(groupedData("name")))
udfApply.show()
}
def processName = udf((input:Row) =>{
println(input)
println(input.schema)
Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname"))
})
}
Run Code Online (Sandbox Code Playgroud)
输出:
root
|-- id: string (nullable = true)
|-- firstname: string (nullable = true)
|-- lastname: string (nullable = true)
+---+---------+--------+
| id|firstname|lastname|
+---+---------+--------+
| 1| jack| reacher|
| 2| john| Doe|
+---+---------+--------+
Run Code Online (Sandbox Code Playgroud)
错误:
[jack,reacher] StructType(StructField(i [1],StringType,true),> StructField(i [2],StringType,true))17/03/08 09:45:35错误执行者:任务0.0中的异常stage 2.0(TID 2)java.lang.IllegalArgumentException:字段"firstname"不存在.
你所遇到的事情真的很奇怪。经过一番研究后,我终于发现这可能与优化器引擎的问题有关。看来问题不是UDF而是函数struct。
cache当我在没有缓存的情况下得到它的工作(Spark 1.6.3)时groupedData,我得到了您报告的异常:
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
object Demo {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[1]"))
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
import org.apache.spark.sql.functions._
def processName = udf((input: Row) => {
Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname"))
})
val inputData =
sc.parallelize(
Seq(("1", "Kevin", "Costner"))
).toDF("id", "firstname", "lastname")
val groupedData = inputData.withColumn("name", struct(inputData("firstname"), inputData("lastname")))
.cache() // does not work without cache
val udfApply = groupedData.withColumn("newName", processName(groupedData("name")))
udfApply.show()
}
}
Run Code Online (Sandbox Code Playgroud)
或者,您可以使用 RDD API 来创建结构,但这并不是很好:
case class Name(firstname:String,lastname:String) // define outside main
val groupedData = inputData.rdd
.map{r =>
(r.getAs[String]("id"),
Name(
r.getAs[String]("firstname"),
r.getAs[String]("lastname")
)
)
}
.toDF("id","name")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
708 次 |
| 最近记录: |