如何从Spark中的文本文件创建DataFrame

Rah*_*hul 15 scala dataframe apache-spark rdd apache-spark-sql

我在HDFS上有一个文本文件,我想将它转换为Spark中的数据框.

我使用Spark Context加载文件,然后尝试从该文件生成单个列.

val myFile = sc.textFile("file.txt")
val myFile1 = myFile.map(x=>x.split(";"))
Run Code Online (Sandbox Code Playgroud)

执行此操作后,我正在尝试以下操作.

myFile1.toDF()
Run Code Online (Sandbox Code Playgroud)

我遇到了问题,因为myFile1 RDD中的元素现在是数组类型.

我该如何解决这个问题?

Tza*_*har 16

更新 - 从Spark 1.6开始,您可以简单地使用内置的csv数据源:

spark: SparkSession = // create the Spark Session
val df = spark.read.csv("file.txt")
Run Code Online (Sandbox Code Playgroud)

您还可以使用各种选项来控制CSV解析,例如:

val df = spark.read.option("header", "false").csv("file.txt")
Run Code Online (Sandbox Code Playgroud)

对于Spark版本<1.6:最简单的方法是使用spark-csv - 将它包含在依赖项中并遵循README,它允许设置自定义分隔符(;),可以读取CSV标题(如果你有它们),它可以推断模式类型(具有额外扫描数据的成本).

或者,如果您知道模式,则可以创建表示它的案例类,并在转换为DataFrame之前将RDD元素映射到此类的实例中,例如:

case class Record(id: Int, name: String)

val myFile1 = myFile.map(x=>x.split(";")).map {
  case Array(id, name) => Record(id.toInt, name)
} 

myFile1.toDF() // DataFrame will have columns "id" and "name"
Run Code Online (Sandbox Code Playgroud)


mga*_*ido 6

如果要使用该toDF方法,则必须将RDDof Array[String]转换RDD为case类。例如,您必须执行以下操作:

case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
Run Code Online (Sandbox Code Playgroud)

  • 谢谢你的回答马克。它应该有一个绿色的勾号,但是Tzach在不到一秒钟的时间内就有了类似的答案,我最终接受了他的解决方案。+1为您提供帮助。 (2认同)

小智 6

我已经给出了从文本文件创建DataFrame的不同方法

val conf = new SparkConf().setAppName(appName).setMaster("local")
val sc = SparkContext(conf)
Run Code Online (Sandbox Code Playgroud)

原始文本文件

val file = sc.textFile("C:\\vikas\\spark\\Interview\\text.txt")
val fileToDf = file.map(_.split(",")).map{case Array(a,b,c) => 
(a,b.toInt,c)}.toDF("name","age","city")
fileToDf.foreach(println(_))
Run Code Online (Sandbox Code Playgroud)

没有架构的火花会话

import org.apache.spark.sql.SparkSession
val sparkSess = 
SparkSession.builder().appName("SparkSessionZipsExample")
.config(conf).getOrCreate()

val df = sparkSess.read.option("header", 
"false").csv("C:\\vikas\\spark\\Interview\\text.txt")
df.show()
Run Code Online (Sandbox Code Playgroud)

与架构的火花会话

import org.apache.spark.sql.types._
val schemaString = "name age city"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, 
StringType, nullable=true))
val schema = StructType(fields)

val dfWithSchema = sparkSess.read.option("header", 
"false").schema(schema).csv("C:\\vikas\\spark\\Interview\\text.txt")
dfWithSchema.show()
Run Code Online (Sandbox Code Playgroud)

使用sql上下文

import org.apache.spark.sql.SQLContext

val fileRdd = 
sc.textFile("C:\\vikas\\spark\\Interview\\text.txt").map(_.split(",")).map{x 
=> org.apache.spark.sql.Row(x:_*)}
val sqlDf = sqlCtx.createDataFrame(fileRdd,schema)
sqlDf.show()
Run Code Online (Sandbox Code Playgroud)