使用scala将Spark处理的结果转储到HDFS

use*_*013 3 hadoop scala hdfs apache-spark

在使用spark处理数据后,我找到了将数据保存到HDFS的正确方法,我感到有点困惑.

这就是我想要做的.我正在计算数字字段的最小值,最大值和标准差.我的输入文件有数百万行,但输出只有大约15-20个字段.因此,输出是每个字段的单个值(标量).

例如:我将FIELD1的所有行加载到RDD中,最后,我将获得FIELD 1的3个单值(MIN,MAX,SD).我将这三个值连接成临时字符串.最后,我将有15到20行,包含以下格式的4列

FIELD_NAME_1  MIN  MAX  SD
FIELD_NAME_2  MIN  MAX  SD
Run Code Online (Sandbox Code Playgroud)

这是代码的片段:

//create rdd
val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
//just get the first column
val values = data.map(_.split(",",-1)(1))

val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
val SD = data_double.stdev
Run Code Online (Sandbox Code Playgroud)

所以,我有3个变量,min_value,max_value和SD,我想存储回hdfs.

问题1:由于输出相当小,我是否只在本地保存在服务器上?或者我应该将其转储到HDFS.在我看来就像在本地转储文件更有意义.

问题2:在spark中,我可以调用以下命令将RDD保存到文本文件中

some_RDD.saveAsTextFile("hdfs://namenode/path")
Run Code Online (Sandbox Code Playgroud)

如何在scala中不是RDD的String变量中完成相同的操作?我应该首先将我的结果并行化为RDD,然后调用saveAsTextFile吗?

aar*_*man 12

要在本地保存就行了

some_RDD.collect()

然后用这个问题保存结果数组.是的,如果数据集很小,并且可以轻松地适应内存,则应收集并将其带到程序的驱动程序中.如果数据有点大到存储在内存中的另一种选择就是some_RDD.coalesce(numParitionsToStoreOn).请记住coalesce也需要一个布尔值shuffle,如果你在合并之前对数据进行计算,你应该将其设置为true以在计算上获得更多的并行性.Coalesce将减少调用时存储数据的节点数some_RDD.saveAsTextFile("hdfs://namenode/path").如果文件非常小但是你需要在hdfs上调用repartition(1),这与之相同coalesce(1,true),这将确保你的数据只保存在一个节点上.

更新: 所以如果你想要做的只是在HDFS中保存三个值,你可以这样做. sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")

基本上你只是将3个变量放在一个元组中,将它包装在List中并将并行性设置为1,因为数据非常小


Cho*_*ang 5

答案1:由于你只需要几个标量,我想说将它们存储在本地文件系统中.您可以先做val localValue = rdd.collect(),这将收集工人到掌握的所有数据.然后你调用java.io将内容写入磁盘.

答案2:你可以做sc.parallelize(yourString).saveAsTextFile("hdfs:// host/yourFile").意志写东西到000*.如果您想将所有内容放在一个文件中,请hdfs dfs -getmerge随时为您提供帮助.