何时使用mapParitions和mapPartitionsWithIndex?

Chr*_*now 9 apache-spark pyspark

PySpark文档描述了两个函数:

mapPartitions(f, preservesPartitioning=False)

   Return a new RDD by applying a function to each partition of this RDD.

   >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
   >>> def f(iterator): yield sum(iterator)
   >>> rdd.mapPartitions(f).collect()
   [3, 7]
Run Code Online (Sandbox Code Playgroud)

而......

mapPartitionsWithIndex(f, preservesPartitioning=False)

   Return a new RDD by applying a function to each partition of this RDD, 
   while tracking the index of the original partition.

   >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
   >>> def f(splitIndex, iterator): yield splitIndex
   >>> rdd.mapPartitionsWithIndex(f).sum()
   6
Run Code Online (Sandbox Code Playgroud)

这些功能试图解决哪些用例?我不明白他们为什么会被要求.

Mri*_*nal 23

要回答这个问题,我们需要将map与mapPartitions/mapPartitionsWithIndex进行比较(mapPartitions和mapPartitionsWithIndex几乎做同样的事情,除了mapPartitionsWithIndex你可以跟踪正在处理的分区).

现在mapPartitions和mapPartitionsWithIndex用于优化应用程序的性能.仅仅为了理解,让我们说RDD中的所有元素都是XML元素,并且您需要一个解析器来处理它们中的每一个.所以你必须采用一个好的解析器类的实例来继续前进.你可以用两种方式做到:

map + foreach:在这种情况下,对于每个元素,将创建一个解析器类的实例,该元素将被处理,然后实例将被及时销毁,但此实例将不会用于其他元素.因此,如果您正在使用分布在4个分区中的12个元素的RDD,则将创建12次解析器实例.正如您所知,创建实例是一项非常昂贵的操作,因此需要时间.

mapPartitions/mapPartitionsWithIndex:这两种方法能够稍微解决上述情况.mapPartitions/mapPartitionsWithIndex适用于分区,而不是元素(请不要误解,所有元素都将被处理).这些方法将为每个分区创建一次解析器实例.由于您只有4个分区,因此解析器实例将被创建4次(对于此示例,将比map少8倍).但是,您将传递给这些方法的函数应该采用Iterator对象(将一个分区的所有元素作为输入一次获取).因此,在mapPartitions和mapPartitionsWithIndex的情况下,将创建解析器实例,将处理当前分区的所有元素,然后GC将稍后销毁该实例.您会发现它们可以显着提高应用程序的性能.

所以最重要的是,每当你看到某些操作对所有元素都是通用的时候,一般来说,你可以做一次并且可以处理所有这些操作,最好使用mapPartitions/mapPartitionsWithIndex.

请通过以下代码示例找到以下两个链接进行说明:https : //bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/ http://apachesparkbook.blogspot.in/2015/ 11/mappartition-example.html的

  • **`map`** + [**`foreachPartition`**](/sf/ask/2133929101/)怎么样?这不会只实例化“n_partitions”个解析器,而不是“n_total_elements”吗? (2认同)

归档时间:

查看次数:

16897 次

最近记录:

8 年,10 月 前