为什么Spark需要折叠动作?

Tom*_*ena 4 reduce fold apache-spark rdd pyspark

我有一个涉及fold和减少的愚蠢问题PySpark.我理解这两种方法之间的区别,但是,如果两者都需要应用函数是一个可交换的monoid,我无法弄清楚fold cannot be substituted byreduce`的一个例子.

另外,在fold使用它的PySpark实现中acc = op(obj, acc),为什么使用这个操作顺序而不是acc = op(acc, obj)?(这个第二顺序leftFold对我来说听起来更加封闭)

干杯

托马斯

zer*_*323 13

空RDD

当它RDD为空时它不能被替换:

val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at   
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...

rdd.fold(0)(_ + _)
// Int = 0
Run Code Online (Sandbox Code Playgroud)

你当然可以结合reduce条件,isEmpty但它相当丑陋.

可变缓冲区

折叠的另一个用例是使用可变缓冲区进行聚合.考虑遵循RDD:

import breeze.linalg.DenseVector

val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)
Run Code Online (Sandbox Code Playgroud)

让我们说我们想要所有元素的总和.一个天真的解决方案是简单地减少+:

rdd.reduce(_ + _)
Run Code Online (Sandbox Code Playgroud)

不幸的是,它为每个元素创建了一个新的向量 由于对象创建和后续垃圾收集很昂贵,因此使用可变对象可能更好.这是不可能的reduce(RDD的不变性并不意味着元件的不变性),但是可以用能够实现fold如下:

rdd.fold(DenseVector(0))((acc, x) => acc += x)
Run Code Online (Sandbox Code Playgroud)

这里使用零元素作为每个分区初始化一次的可变缓冲区,保持实际数据不变.

acc = op(obj,acc),为什么使用此操作顺序而不是acc = op(acc,obj)

SPARK-6416SPARK-7683