如何在Spark Scala中使用mapPartitions?

Spa*_*tan 3 scala apache-spark

我有DocsRDD:RDD [String,String]

val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2)
Run Code Online (Sandbox Code Playgroud)

DocsRDD:

Doc1.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc2.txt , bla bla bla .....bla \n bla bla \n bla ... bla
Doc3.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc4.txt , bla bla \n  .....\n bla bla bla bla \n ... bla
Run Code Online (Sandbox Code Playgroud)

有没有一种高效,优雅的方法从mapPartitions中提取n-gram?到目前为止,我已经尝试了所有的东西,我已经阅读了至少5次关于mapPartitions的一切,但我仍然无法理解如何使用它!似乎太难以操纵了.总之我想:

val NGramsRDD = DocsRDD.map(x => (x._1 , x._2.sliding(n) ) )
Run Code Online (Sandbox Code Playgroud)

但是有效地使用mapPartitions.我对mapPartitions的基本误解是:

OneDocRDD:RDD [String]

 val OneDocRDD = sc.textFile("myDoc1.txt" , 2)
                   .mapPartitions(s1 : Iterator[String] => s2 : Iterator[String])
Run Code Online (Sandbox Code Playgroud)

我不明白这!从s1开始是Iterator [String]?s1是sc.textfile之后的String.

好吧,我的第二个问题是:mapPartitions会改善我在这种情况下对抗地图吗?

最后但并非最不重要:f()可以是:

     f(Iterator[String]) : Iterator[Something else?]
Run Code Online (Sandbox Code Playgroud)

Pas*_*ucy 9

我不确定.mapPartitions是否有用(至少没有给出示例),但使用.mapPartitions看起来像:

val OneDocRDD = sc.textFile("myDoc1.txt", 2)
  .mapPartitions(iter => {
    // here you can initialize objects that you would need 
    // that you want to create once by worker and not for each x in the map. 
    iter.map(x => (x._1 , x._2.sliding(n)))
  })
Run Code Online (Sandbox Code Playgroud)

通常,您希望使用.mapPartitions来创建/初始化您不想要的对象(例如:太大)或无法序列化到工作节点.如果没有.mapPartitions,您需要在.map中创建它们,但由于将为每个x创建对象,因此效率不高.