pyt*_*nic 12 scala bigdata hdfs apache-spark rdd
我有一个RDD,其元素是类型(长,字符串).出于某种原因,我想将整个RDD保存到HDFS中,稍后还会在Spark程序中读取该RDD.有可能吗?如果是这样,怎么样?
T. *_*ęda 12
有可能的.
在RDD中你有saveAsObjectFile和saveAsTextFile功能.元组存储为(value1, value2),因此您可以稍后解析它.
可以使用textFileSparkContext中的函数完成读取,然后.map消除()
所以:版本1:
rdd.saveAsTextFile ("hdfs:///test1/");
// later, in other program
val newRdds = sparkContext.textFile("hdfs:///test1/part-*").map (x => {
// here remove () and parse long / strings
})
Run Code Online (Sandbox Code Playgroud)
版本2:
rdd.saveAsObjectFile ("hdfs:///test1/");
// later, in other program - watch, you have tuples out of the box :)
val newRdds = sparkContext.sc.sequenceFile("hdfs:///test1/part-*", classOf[Long], classOf[String])
Run Code Online (Sandbox Code Playgroud)
如果您的 RDD 是表格格式,我会建议使用 DataFrame。数据框是一个表格,或类似二维数组的结构,其中每一列包含一个变量的测量值,每一行包含一个案例。由于其表格格式,DataFrame 具有额外的元数据,这允许 Spark 对最终查询运行某些优化。其中 RDD 是一个弹性分布式数据集,它更像是一个无法优化的黑盒或数据的核心抽象。但是,您可以从 DataFrame 转到 RDD,反之亦然,并且可以通过 toDF 方法从 RDD 转到 DataFrame(如果 RDD 为表格格式)。
以下是在 HDFS 中创建/存储 CSV 和 Parquet 格式的 DataFrame 的示例,
val conf = {
new SparkConf()
.setAppName("Spark-HDFS-Read-Write")
}
val sqlContext = new SQLContext(sc)
val sc = new SparkContext(conf)
val hdfs = "hdfs:///"
val df = Seq((1, "Name1")).toDF("id", "name")
// Writing file in CSV format
df.write.format("com.databricks.spark.csv").mode("overwrite").save(hdfs + "user/hdfs/employee/details.csv")
// Writing file in PARQUET format
df.write.format("parquet").mode("overwrite").save(hdfs + "user/hdfs/employee/details")
// Reading CSV files from HDFS
val dfIncsv = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(hdfs + "user/hdfs/employee/details.csv")
// Reading PQRQUET files from HDFS
val dfInParquet = sqlContext.read.parquet(hdfs + "user/hdfs/employee/details")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
22479 次 |
| 最近记录: |