pra*_*ash 1 hadoop scala apache-spark spark-dataframe
我有一个文本文件,其中String REC作为记录定界符,而换行符作为列定界符,每个数据都有附加的列名,以逗号作为定界符,以下是示例数据格式
REC
Id,19048
任期,牛奶
等级,1
REC
Id,19049
任期,玉米
等级,5
使用REC作为记录定界符。现在,我想创建具有列名ID,术语和等级的Spark数据框架。
这是工作代码
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}
object RecordSeparator extends App {
var conf = new
SparkConf().setAppName("test").setMaster("local[1]")
.setExecutorEnv("executor- cores", "2")
var sc = new SparkContext(conf)
val hconf = new Configuration
hconf.set("textinputformat.record.delimiter", "REC")
val data = sc.newAPIHadoopFile("data.txt",
classOf[TextInputFormat], classOf[LongWritable],
classOf[Text], hconf).map(x => x._2.toString.trim).filter(x => x != "")
.map(x => getRecord(x)).map(x => x.split(","))
.map(x => record(x(0), x(2), x(2)))
val sqlContext = new SQLContext(sc)
val df = data.toDF()
df.printSchema()
df.show(false)
def getRecord(in: String): String = {
val ar = in.split("\n").mkString(",").split(",")
val data = Array(ar(1), ar(3), ar(5))
data.mkString(",")
}
}
case class record(Id: String, Term: String, Rank: String)
Run Code Online (Sandbox Code Playgroud)
输出:
root
|-- Id: string (nullable = true)
|-- Term: string (nullable = true)
|-- Rank: string (nullable = true)
+-----+----+----+
|Id |Term|Rank|
+-----+----+----+
|19048|1 |1 |
|19049|5 |5 |
+-----+----+----+
Run Code Online (Sandbox Code Playgroud)