Spark Scala:此用例中的 mapPartitions

mjb*_*gll 3 scala apache-spark

我阅读了很多关于 map 和 mapPartitions 之间差异的信息。我还是有些疑惑。事情是在阅读后我决定更改代码中的map函数,mapPartitions因为显然mapPartitionsmap.

我的问题是要确定我的决定在以下场景中是否正确(注释显示了以前的代码):

val reducedRdd = rdd.mapPartitions(partition => partition.map(r => (r.id, r)))
//val reducedRdd = rdd.map(r => (r.id, r))
.reduceByKey((r1, r2) => r1.combineElem(r2))
// .map(e => e._2)
.mapPartitions(partition => partition.map(e => e._2))
Run Code Online (Sandbox Code Playgroud)

我想对了吗?谢谢!

mor*_*007 5

在您的情况下, mapPartitions 不应该有任何区别。

地图分区与地图

当我们想要为每个分区执行一些通用计算时,mapPartitions 很有用。例子 -

rdd.mapPartitions{
  partition => 
    val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>
    partition.map {
      row => (row.id, complicatedRowConverter(row) )
    }
}
Run Code Online (Sandbox Code Playgroud)

在上面的例子中,我们正在创建一个complicatedRowConverter从一些昂贵的计算派生的函数。这个函数对于整个 RDD 分区都是一样的,我们不需要一次又一次地重新创建它。做同样事情的另一种方法可以是 -

rdd.map { row => 
      val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>
      (row.id, complicatedRowConverter(row) )
    }
}
Run Code Online (Sandbox Code Playgroud)

这会很慢,因为我们不必要地为每一行运行这个语句 - val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>

在您的情况下,您没有任何预计算或每个分区的任何其他内容。在 mapPartition 中,您只需遍历每一行并将其映射到(row.id, row).

所以这里的 mapPartition 不会受益,你可以使用简单的地图。