从自定义数据格式创建Spark数据框架

pra*_*ash 1 hadoop scala apache-spark spark-dataframe

我有一个文本文件,其中String REC作为记录定界符,而换行符作为列定界符,每个数据都有附加的列名,以逗号作为定界符,以下是示例数据格式

REC
Id,19048
任期,牛奶
等级,1
REC
Id,19049
任期,玉米
等级,5

使用REC作为记录定界符。现在,我想创建具有列名ID,术语和等级的Spark数据框架。

Nar*_*mar 6

这是工作代码

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}


object RecordSeparator extends App {
  var conf = new
      SparkConf().setAppName("test").setMaster("local[1]")
    .setExecutorEnv("executor- cores", "2")
  var sc = new SparkContext(conf)
  val hconf = new Configuration
  hconf.set("textinputformat.record.delimiter", "REC")
  val data = sc.newAPIHadoopFile("data.txt",
    classOf[TextInputFormat], classOf[LongWritable],
    classOf[Text], hconf).map(x => x._2.toString.trim).filter(x => x != "")
    .map(x => getRecord(x)).map(x => x.split(","))
    .map(x => record(x(0), x(2), x(2)))

  val sqlContext = new SQLContext(sc)
  val df = data.toDF()
  df.printSchema()
  df.show(false)

  def getRecord(in: String): String = {
    val ar = in.split("\n").mkString(",").split(",")
    val data = Array(ar(1), ar(3), ar(5))
    data.mkString(",")
  }
}

case class record(Id: String, Term: String, Rank: String)
Run Code Online (Sandbox Code Playgroud)

输出:

 root
 |-- Id: string (nullable = true)
 |-- Term: string (nullable = true)
 |-- Rank: string (nullable = true)

+-----+----+----+
|Id   |Term|Rank|
+-----+----+----+
|19048|1   |1   |
|19049|5   |5   |
+-----+----+----+
Run Code Online (Sandbox Code Playgroud)