use*_*013 3 hadoop scala hdfs apache-spark
在使用spark处理数据后,我找到了将数据保存到HDFS的正确方法,我感到有点困惑.
这就是我想要做的.我正在计算数字字段的最小值,最大值和标准差.我的输入文件有数百万行,但输出只有大约15-20个字段.因此,输出是每个字段的单个值(标量).
例如:我将FIELD1的所有行加载到RDD中,最后,我将获得FIELD 1的3个单值(MIN,MAX,SD).我将这三个值连接成临时字符串.最后,我将有15到20行,包含以下格式的4列
FIELD_NAME_1 MIN MAX SD
FIELD_NAME_2 MIN MAX SD
Run Code Online (Sandbox Code Playgroud)
这是代码的片段:
//create rdd
val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
//just get the first column
val values = data.map(_.split(",",-1)(1))
val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
val SD = data_double.stdev
Run Code Online (Sandbox Code Playgroud)
所以,我有3个变量,min_value,max_value和SD,我想存储回hdfs.
问题1:由于输出相当小,我是否只在本地保存在服务器上?或者我应该将其转储到HDFS.在我看来就像在本地转储文件更有意义.
问题2:在spark中,我可以调用以下命令将RDD保存到文本文件中
some_RDD.saveAsTextFile("hdfs://namenode/path")
Run Code Online (Sandbox Code Playgroud)
如何在scala中不是RDD的String变量中完成相同的操作?我应该首先将我的结果并行化为RDD,然后调用saveAsTextFile吗?
aar*_*man 12
要在本地保存就行了
some_RDD.collect()
然后用这个问题保存结果数组.是的,如果数据集很小,并且可以轻松地适应内存,则应收集并将其带到程序的驱动程序中.如果数据有点大到存储在内存中的另一种选择就是some_RDD.coalesce(numParitionsToStoreOn).请记住coalesce也需要一个布尔值shuffle,如果你在合并之前对数据进行计算,你应该将其设置为true以在计算上获得更多的并行性.Coalesce将减少调用时存储数据的节点数some_RDD.saveAsTextFile("hdfs://namenode/path").如果文件非常小但是你需要在hdfs上调用repartition(1),这与之相同coalesce(1,true),这将确保你的数据只保存在一个节点上.
更新:
所以如果你想要做的只是在HDFS中保存三个值,你可以这样做.
sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")
基本上你只是将3个变量放在一个元组中,将它包装在List中并将并行性设置为1,因为数据非常小
答案1:由于你只需要几个标量,我想说将它们存储在本地文件系统中.您可以先做val localValue = rdd.collect(),这将收集工人到掌握的所有数据.然后你调用java.io将内容写入磁盘.
答案2:你可以做sc.parallelize(yourString).saveAsTextFile("hdfs:// host/yourFile").意志写东西到000*.如果您想将所有内容放在一个文件中,请hdfs dfs -getmerge随时为您提供帮助.
| 归档时间: |
|
| 查看次数: |
15521 次 |
| 最近记录: |