Pri*_*ain 1 scala apache-spark spark-dataframe
我正在执行以下代码以从文本文件创建数据框.
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StructType, StringType, StructField}
/**
* Created by PSwain on 6/19/2016.
*/
object RddToDataframe extends App {
val scnf=new SparkConf().setAppName("RddToDataFrame").setMaster("local[1]")
val sc = new SparkContext(scnf)
val sqlContext = new SQLContext(sc)
val employeeRdd=sc.textFile("C:\\Users\\pswain\\IdeaProjects\\test1\\src\\main\\resources\\employee")
//Creating schema
val employeeSchemaString="id name age"
val schema = StructType(employeeSchemaString.split(",").map( colNmae => StructField(colNmae,StringType,true)))
//Creating RowRdd
val rowRdd= employeeRdd.map(row => row.split(",")).map(row => Row(row(0).trim.toInt,row(1),row(2).trim.toInt))
//Creating dataframe = RDD[rowRdd] + schema
val employeeDF=sqlContext.createDataFrame(rowRdd,schema). registerTempTable("Employee")
sqlContext.sql("select * from Employee").show()
}
Run Code Online (Sandbox Code Playgroud)
但是在InteliJ中执行时,我发现类型不匹配错误如下所示.无法识别出现此错误的原因我只是将字符串转换为整数.员工文件具有以下输入,它们显示在一行中,但它们各自为一行.
1201,satish,25 1202,krishna,28 1203,amith,39 1204,javed,23 1205,prudvi,23
16/06/19 15:18:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
scala.MatchError: 1201 (of class java.lang.Integer)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
Run Code Online (Sandbox Code Playgroud)
创建模式时,所有列类型都定义为StringType.
val schema = StructType(employeeSchemaString.split(",").map( colNmae => StructField(colNmae,StringType,true)))
Run Code Online (Sandbox Code Playgroud)
但是rowRDD有int,string和int类型的列.
这是工作代码
val structType= {
val id = StructField("id", IntegerType)
val name = StructField("name", StringType)
val age = StructField("age", IntegerType)
new StructType(Array(id, name , age))
}
val rowRdd= employeeRdd.map(row => row.split(",")).map(row => Row(row(0).trim().toInt,row(1),row(2).trim().toInt))
sqlContext.createDataFrame(rowRdd,structType). registerTempTable("Employee")
sqlContext.sql("select * from Employee").show()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4816 次 |
| 最近记录: |