mcm*_*cmc 10 scala apache-spark
我是Spark的新手,我正在尝试使用它来自的文件名向每个输入行插入一列.
我见过其他人问过类似的问题,但他们使用了所有的答案wholeTextFile,但我正在尝试为更大的CSV文件(使用Spark-CSV库读取),JSON文件和Parquet文件(不仅仅是小文本文件)执行此操作).
我可以使用它spark-shell来获取文件名列表:
val df = sqlContext.read.parquet("/blah/dir")
val names = df.select(inputFileName())
names.show
Run Code Online (Sandbox Code Playgroud)
但这是一个数据帧.我不知道如何将它作为列添加到每一行(如果该结果的排序与初始数据相同,但我认为它总是如此)以及如何将其作为所有输入类型的通用解决方案.
小智 13
我刚刚发现的另一个解决方案是将文件名添加为DataFrame中的一列
val df = sqlContext.read.parquet("/blah/dir")
val dfWithCol = df.withColumn("filename",input_file_name())
Run Code Online (Sandbox Code Playgroud)
当您从文本文件创建 RDD 时,您可能希望将数据映射到案例类中,因此您可以在该阶段添加输入源:
case class Person(inputPath: String, name: String, age: Int)
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt"
val rdd = sc.textFile(inputPath).map {
l =>
val tokens = l.split(",")
Person(inputPath, tokens(0), tokens(1).trim().toInt)
}
rdd.collect().foreach(println)
Run Code Online (Sandbox Code Playgroud)
如果您不想将“业务数据”与元数据混合:
case class InputSourceMetaData(path: String, size: Long)
case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData)
// Fake the size, for demo purposes only
val md = InputSourceMetaData(inputPath, size = -1L)
val rdd = sc.textFile(inputPath).map {
l =>
val tokens = l.split(",")
PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
}
rdd.collect().foreach(println)
Run Code Online (Sandbox Code Playgroud)
如果将 RDD 提升为 DataFrame:
import sqlContext.implicits._
val df = rdd.toDF()
df.registerTempTable("x")
Run Code Online (Sandbox Code Playgroud)
你可以这样查询
sqlContext.sql("select name, metadata from x").show()
sqlContext.sql("select name, metadata.path from x").show()
sqlContext.sql("select name, metadata.path, metadata.size from x").show()
Run Code Online (Sandbox Code Playgroud)
更新
您可以使用递归方式读取 HDFS 中的文件org.apache.hadoop.fs.FileSystem.listFiles()。
给定值中的文件名列表files(包含 的标准 Scala 集合org.apache.hadoop.fs.LocatedFileStatus),您可以为每个文件创建一个 RDD:
val rdds = files.map { f =>
val md = InputSourceMetaData(f.getPath.toString, f.getLen)
sc.textFile(md.path).map {
l =>
val tokens = l.split(",")
PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
}
}
Run Code Online (Sandbox Code Playgroud)
现在您可以将reduceRDD 列表合并为一个: 该函数将reduce所有 RDD 合并为一个:
val rdd = rdds.reduce(_ ++ _)
rdd.collect().foreach(println)
Run Code Online (Sandbox Code Playgroud)
这可行,但我无法测试这是否可以很好地处理大文件。
| 归档时间: |
|
| 查看次数: |
11309 次 |
| 最近记录: |