使用mapPartition和迭代器保存spark RDD

Chi*_*iku 3 java hadoop scala hdfs apache-spark

我有一些中间数据,我需要存储在HDFS和本地.我正在使用Spark 1.6.在HDFS作为中间形式我正在获取数据/output/testDummy/part-00000/output/testDummy/part-00001.我想使用Java/Scala的地方保存这些分区,这样我可以将它们保存为/users/home/indexes/index.nt(双方在当地的合并),或者/users/home/indexes/index-0000.nt/home/indexes/index-0001.nt分别.

这是我的代码:注意:testDummy与test相同,输出有两个分区.我想单独存储它们或组合它们但是本地存储index.nt文件.我更喜欢分别存储在两个数据节点中.我正在使用集群并在YARN上提交spark工作.我还添加了一些评论,多少次以及我得到的数据.我该怎么办?任何帮助表示赞赏.

 val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
 println("testDummy done")   //1 time print

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
    println("Inside savesData")                                 //  now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
    println("iter size"+iterator.size)                           //  2 735 2 735 values
    val filenamesWithExtension = outputPath + "/index.nt"
    println("filenamesWithExtension "+filenamesWithExtension.length)   //4 times
    var list = List[(String)]()

    val fileWritter = new FileWriter(filenamesWithExtension,true)
    val bufferWritter = new BufferedWriter(fileWritter)

     while (iterator.hasNext){                       //iterator.hasNext is false
       println("inside iterator")                    //0 times 
       val dat = iterator.next()
       println("datadata "+iterator.next())

       bufferWritter.write(dat + "\n")
       bufferWritter.flush()
       println("index files written")

       val dataElements = dat.split(" ")
       println("dataElements")                                    //0
       list = list.::(dataElements(0))
       list = list.::(dataElements(1))
       list = list.::(dataElements(2))
     }
    bufferWritter.close() //closing
    println("savesData method end")                         //4 times when coal=2
    list.iterator
}

println("before saving data into local")                              //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions)                               //2
println("testRDD size "+test.collect().length)                                //0
println("after saving data into local")   //1
Run Code Online (Sandbox Code Playgroud)

PS:我跟着,这个这个但不完全相同我正在寻找,我做了某种但没有得到任何东西index.nt

zer*_*323 7

有几件事:

  • Iterator.size如果您打算稍后使用数据,请勿致电.Iterators是的TraversableOnce.计算Iterator大小的唯一方法是遍历其所有元素,之后不再需要读取数据.
  • 不要像mapPartitions副作用那样使用转换.如果要执行某些类型的IO使用操作,例如foreach/ foreachPartition.这是一种不好的做法,并不保证给定的代码段只能执行一次.
  • 动作或转换中的本地路径是特定工作者的本地路径.如果要直接在客户端计算机上编写,则应首先使用collect或获取数据toLocalIterator.尽管写入分布式存储并稍后获取数据可能会更好.