在Spark/Scala中将RDD转换为Dataframe

spa*_*ler 6 hadoop scala apache-spark

RDD已以格式创建,Array[Array[String]]并具有以下值:

val rdd : Array[Array[String]] = Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
Array("4580056797", "0", "2015-07-29 10:38:43", "0", "1", "1"))
Run Code Online (Sandbox Code Playgroud)

我想用架构创建一个dataFrame:

val schemaString = "callId oCallId callTime duration calltype swId"
Run Code Online (Sandbox Code Playgroud)

下一步:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)
Run Code Online (Sandbox Code Playgroud)

给出以下错误:

console:45:error:重载方法值createDataFrame with alternatives:(rdd:org.apache.spark.api.java.JavaRDD [ ],beanClass:Class [ ])org.apache.spark.sql.DataFrame(rdd:org. apache.spark.rdd.RDD [ ],beanClass:Class [ ])org.apache.spark.sql.DataFrame(rowRDD:org.apache.spark.api.java.JavaRDD [org.apache.spark.sql.Row] ,schema:org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame(rowRDD:org.apache.spark.rdd.RDD [org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame不能应用于(org.apache.spark.rdd.RDD [Array [String]],
org.apache.spark.sql. types.StructType)val calDF = sqlContext.createDataFrame(rowRDD,schema)

Ber*_*ium 12

只需粘贴到spark-shell:

val a = 
  Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd = sc.makeRDD(a)

case class X(callId: String, oCallId: String, 
  callTime: String, duration: String, calltype: String, swId: String)
Run Code Online (Sandbox Code Playgroud)

然后map()通过RDD创建案例类的实例,然后使用toDF()以下命令创建DataFrame :

scala> val df = rdd.map { 
  case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
df: org.apache.spark.sql.DataFrame = 
  [callId: string, oCallId: string, callTime: string, 
    duration: string, calltype: string, swId: string]
Run Code Online (Sandbox Code Playgroud)

这推断出案例类的架构.

然后你可以继续:

scala> df.printSchema()
root
 |-- callId: string (nullable = true)
 |-- oCallId: string (nullable = true)
 |-- callTime: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- calltype: string (nullable = true)
 |-- swId: string (nullable = true)

scala> df.show()
+----------+-------+-------------------+--------+--------+----+
|    callId|oCallId|           callTime|duration|calltype|swId|
+----------+-------+-------------------+--------+--------+----+
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
+----------+-------+-------------------+--------+--------+----+
Run Code Online (Sandbox Code Playgroud)

如果你想toDF()在普通程序中使用(不在spark-shell),请确保(引自此处):

  • import sqlContext.implicits._在创建之后SQLContext
  • 使用方法定义方法之外的case类 toDF()