如何将RDD保存到HDFS中以后再读回?

pyt*_*nic 12 scala bigdata hdfs apache-spark rdd

我有一个RDD,其元素是类型(长,字符串).出于某种原因,我想将整个RDD保存到HDFS中,稍后还会在Spark程序中读取该RDD.有可能吗?如果是这样,怎么样?

T. *_*ęda 12

有可能的.

在RDD中你有saveAsObjectFilesaveAsTextFile功能.元组存储为(value1, value2),因此您可以稍后解析它.

可以使用textFileSparkContext中的函数完成读取,然后.map消除()

所以:版本1:

rdd.saveAsTextFile ("hdfs:///test1/");
// later, in other program
val newRdds = sparkContext.textFile("hdfs:///test1/part-*").map (x => {
    // here remove () and parse long / strings
})
Run Code Online (Sandbox Code Playgroud)

版本2:

rdd.saveAsObjectFile ("hdfs:///test1/");
// later, in other program - watch, you have tuples out of the box :)
val newRdds = sparkContext.sc.sequenceFile("hdfs:///test1/part-*", classOf[Long], classOf[String])
Run Code Online (Sandbox Code Playgroud)


Kri*_*ris 6

如果您的 RDD 是表格格式,我会建议使用 DataFrame。数据框是一个表格,或类似二维数组的结构,其中每一列包含一个变量的测量值,每一行包含一个案例。由于其表格格式,DataFrame 具有额外的元数据,这允许 Spark 对最终查询运行某些优化。其中 RDD 是一个弹性分布式数据集,它更像是一个无法优化的黑盒或数据的核心抽象。但是,您可以从 DataFrame 转到 RDD,反之亦然,并且可以通过 toDF 方法从 RDD 转到 DataFrame(如果 RDD 为表格格式)。

以下是在 HDFS 中创建/存储 CSV 和 Parquet 格式的 DataFrame 的示例,

val conf = {
   new SparkConf()
     .setAppName("Spark-HDFS-Read-Write")
 }

 val sqlContext = new SQLContext(sc)

 val sc = new SparkContext(conf)

 val hdfs = "hdfs:///"
 val df = Seq((1, "Name1")).toDF("id", "name")

 //  Writing file in CSV format
 df.write.format("com.databricks.spark.csv").mode("overwrite").save(hdfs + "user/hdfs/employee/details.csv")

 // Writing file in PARQUET format
 df.write.format("parquet").mode("overwrite").save(hdfs + "user/hdfs/employee/details")

 //  Reading CSV files from HDFS
 val dfIncsv = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(hdfs + "user/hdfs/employee/details.csv")

 // Reading PQRQUET files from HDFS
 val dfInParquet = sqlContext.read.parquet(hdfs + "user/hdfs/employee/details")
Run Code Online (Sandbox Code Playgroud)