Chi*_*iku 3 java hadoop scala hdfs apache-spark
我有一些中间数据,我需要存储在HDFS和本地.我正在使用Spark 1.6.在HDFS作为中间形式我正在获取数据/output/testDummy/part-00000和/output/testDummy/part-00001.我想使用Java/Scala的地方保存这些分区,这样我可以将它们保存为/users/home/indexes/index.nt(双方在当地的合并),或者/users/home/indexes/index-0000.nt和/home/indexes/index-0001.nt分别.
这是我的代码:注意:testDummy与test相同,输出有两个分区.我想单独存储它们或组合它们但是本地存储index.nt文件.我更喜欢分别存储在两个数据节点中.我正在使用集群并在YARN上提交spark工作.我还添加了一些评论,多少次以及我得到的数据.我该怎么办?任何帮助表示赞赏.
val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
println("testDummy done") //1 time print
def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
println("Inside savesData") // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
println("iter size"+iterator.size) // 2 735 2 735 values
val filenamesWithExtension = outputPath + "/index.nt"
println("filenamesWithExtension "+filenamesWithExtension.length) //4 times
var list = List[(String)]()
val fileWritter = new FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)
while (iterator.hasNext){ //iterator.hasNext is false
println("inside iterator") //0 times
val dat = iterator.next()
println("datadata "+iterator.next())
bufferWritter.write(dat + "\n")
bufferWritter.flush()
println("index files written")
val dataElements = dat.split(" ")
println("dataElements") //0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2))
}
bufferWritter.close() //closing
println("savesData method end") //4 times when coal=2
list.iterator
}
println("before saving data into local") //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions) //2
println("testRDD size "+test.collect().length) //0
println("after saving data into local") //1
Run Code Online (Sandbox Code Playgroud)
有几件事:
Iterator.size如果您打算稍后使用数据,请勿致电.Iterators是的TraversableOnce.计算Iterator大小的唯一方法是遍历其所有元素,之后不再需要读取数据.mapPartitions副作用那样使用转换.如果要执行某些类型的IO使用操作,例如foreach/ foreachPartition.这是一种不好的做法,并不保证给定的代码段只能执行一次.collect或获取数据toLocalIterator.尽管写入分布式存储并稍后获取数据可能会更好.| 归档时间: |
|
| 查看次数: |
1565 次 |
| 最近记录: |