Tom*_*ena 4 reduce fold apache-spark rdd pyspark
我有一个涉及fold
和减少的愚蠢问题PySpark
.我理解这两种方法之间的区别,但是,如果两者都需要应用函数是一个可交换的monoid,我无法弄清楚fold cannot be substituted by
reduce`的一个例子.
另外,在fold
使用它的PySpark实现中acc = op(obj, acc)
,为什么使用这个操作顺序而不是acc = op(acc, obj)
?(这个第二顺序leftFold
对我来说听起来更加封闭)
干杯
托马斯
zer*_*323 13
空RDD
当它RDD
为空时它不能被替换:
val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...
rdd.fold(0)(_ + _)
// Int = 0
Run Code Online (Sandbox Code Playgroud)
你当然可以结合reduce
条件,isEmpty
但它相当丑陋.
可变缓冲区
折叠的另一个用例是使用可变缓冲区进行聚合.考虑遵循RDD:
import breeze.linalg.DenseVector
val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)
Run Code Online (Sandbox Code Playgroud)
让我们说我们想要所有元素的总和.一个天真的解决方案是简单地减少+
:
rdd.reduce(_ + _)
Run Code Online (Sandbox Code Playgroud)
不幸的是,它为每个元素创建了一个新的向量 由于对象创建和后续垃圾收集很昂贵,因此使用可变对象可能更好.这是不可能的reduce
(RDD的不变性并不意味着元件的不变性),但是可以用能够实现fold
如下:
rdd.fold(DenseVector(0))((acc, x) => acc += x)
Run Code Online (Sandbox Code Playgroud)
这里使用零元素作为每个分区初始化一次的可变缓冲区,保持实际数据不变.
acc = op(obj,acc),为什么使用此操作顺序而不是acc = op(acc,obj)
归档时间: |
|
查看次数: |
2135 次 |
最近记录: |