Chr*_*now 9 apache-spark pyspark
PySpark文档描述了两个函数:
Run Code Online (Sandbox Code Playgroud)mapPartitions(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD. >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7]
而......
Run Code Online (Sandbox Code Playgroud)mapPartitionsWithIndex(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. >>> rdd = sc.parallelize([1, 2, 3, 4], 4) >>> def f(splitIndex, iterator): yield splitIndex >>> rdd.mapPartitionsWithIndex(f).sum() 6
这些功能试图解决哪些用例?我不明白他们为什么会被要求.
Mri*_*nal 23
要回答这个问题,我们需要将map与mapPartitions/mapPartitionsWithIndex进行比较(mapPartitions和mapPartitionsWithIndex几乎做同样的事情,除了mapPartitionsWithIndex你可以跟踪正在处理的分区).
现在mapPartitions和mapPartitionsWithIndex用于优化应用程序的性能.仅仅为了理解,让我们说RDD中的所有元素都是XML元素,并且您需要一个解析器来处理它们中的每一个.所以你必须采用一个好的解析器类的实例来继续前进.你可以用两种方式做到:
map + foreach:在这种情况下,对于每个元素,将创建一个解析器类的实例,该元素将被处理,然后实例将被及时销毁,但此实例将不会用于其他元素.因此,如果您正在使用分布在4个分区中的12个元素的RDD,则将创建12次解析器实例.正如您所知,创建实例是一项非常昂贵的操作,因此需要时间.
mapPartitions/mapPartitionsWithIndex:这两种方法能够稍微解决上述情况.mapPartitions/mapPartitionsWithIndex适用于分区,而不是元素(请不要误解,所有元素都将被处理).这些方法将为每个分区创建一次解析器实例.由于您只有4个分区,因此解析器实例将被创建4次(对于此示例,将比map少8倍).但是,您将传递给这些方法的函数应该采用Iterator对象(将一个分区的所有元素作为输入一次获取).因此,在mapPartitions和mapPartitionsWithIndex的情况下,将创建解析器实例,将处理当前分区的所有元素,然后GC将稍后销毁该实例.您会发现它们可以显着提高应用程序的性能.
所以最重要的是,每当你看到某些操作对所有元素都是通用的时候,一般来说,你可以做一次并且可以处理所有这些操作,最好使用mapPartitions/mapPartitionsWithIndex.
请通过以下代码示例找到以下两个链接进行说明:https : //bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/ http://apachesparkbook.blogspot.in/2015/ 11/mappartition-example.html的
| 归档时间: |
|
| 查看次数: |
16897 次 |
| 最近记录: |