RDD聚合在火花中

Lij*_*hew 15 scala apache-spark rdd

我是一个Apache星火学习者和所遇到的一个RDD动作aggregate,我有没有它的功能如何线索.有人可以逐步详细解释并详细解释我们如何在此处得到以下代码的结果

RDD input = {1,2,3,3}

RDD Aggregate function :

rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))

output : {9,4}
Run Code Online (Sandbox Code Playgroud)

谢谢

zer*_*323 24

如果您不确定发生了什么,最好遵循类型.ClassTag为简洁而省略,我们从这样的事情开始

abstract class RDD[T] extends Serializable with Logging 

def aggregate[U](zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U): U 
Run Code Online (Sandbox Code Playgroud)

如果忽略所有其他参数,您将看到这aggregate是一个映射RDD[T]到的函数U.这意味着输入RDD中值的类型不必与输出值的类型相同.所以它明显不同于例如reduce:

def reduce(func: (T, T) ? T): T 
Run Code Online (Sandbox Code Playgroud)

或者fold:

def fold(zeroValue: T)(op: (T, T) => T): T
Run Code Online (Sandbox Code Playgroud)

同样的fold,aggregate需要一个zeroValue.怎么选择它?它应该是一个关于的身份(中立)元素combOp.

您还必须提供两个功能:

  • seqOp从哪里映射(U, T)U
  • combOp从哪里映射(U, U)U

基于此签名,您应该已经看到只能seqOp访问原始数据.它需要一些类型的U另一个类型T的值,并返回一个类型的值U.在您的情况下,它是具有以下签名的函数

((Int, Int), Int) => (Int, Int) 
Run Code Online (Sandbox Code Playgroud)

此时你可能怀疑它被用于某种类似折叠的操作.

第二个函数接受两个类型的参数,U并返回一个类型的值U.如前所述,应该清楚它不会触及原始数据,只能对已经处理过的值进行操作seqOp.在您的情况下,此函数具有如下签名:

((Int, Int), (Int, Int)) => (Int, Int) 
Run Code Online (Sandbox Code Playgroud)

那么我们如何才能将所有这些结合在一起呢?

  1. 首先每个分区使用标准聚合Iterator.aggregatezeroValue,seqOpcombOp作为通过了z,seqopcomboprespectivelly.由于InterruptibleIterator内部使用不覆盖aggregate它应该作为一个简单的执行foldLeft(zeroValue)(seqOp)

  2. 使用,聚合从每个分区收集的下一个部分结果 combOp

让我们假设输入RDD有三个分区,其值分布如下:

  • Iterator(1, 2)
  • Iterator(2, 3)
  • Iterator()

你可以期望执行,忽略绝对顺序,将等同于这样的事情:

val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)

Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
  .map(_.foldLeft((0, 0))(seqOp))
  .reduce(combOp)
Run Code Online (Sandbox Code Playgroud)

foldLeft 对于单个分区,可以如下所示:

Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)
Run Code Online (Sandbox Code Playgroud)

以及所有分区

Seq((3,2), (6,2), (0,0))
Run Code Online (Sandbox Code Playgroud)

哪个组合将给你观察结果:

(3 + 6 + 0, 2 + 2 + 0)
(9, 4)
Run Code Online (Sandbox Code Playgroud)

通常,这是一个常见的模式,您可以在Spark中找到通过中性值的函数,一个用于处理每个分区值的函数和一个用于合并来自不同分区的部分聚合的函数.其他一些例子包括:

  • aggregateByKey
  • 用户定义的聚合函数
  • Aggregators在Spark上Datasets.


Yua*_* Xu 5

以下是我的理解,供大家参考:

假设您有两个节点,一个接受前两个列表元素 {1,2} 的输入,另一个接受 {3, 3}。(这里的分区只是为了方便)

在第一个节点: " (x, y) => (x._1 + y, x._2 + 1) " ,第一个 x 是给定的 (0,0),y 是你的第一个元素 1,你将有输出 (0+1, 0+1),然后是您的第二个元素 y=2,并输出 (1 + 2, 1 + 1),即 (3, 2)

在第二个节点,相同的过程并行发生,您将获得 (6, 2)。

" (x, y) => (x._1 + y._1, x._2 + y._2) ",告诉你合并两个节点,你会得到 (9,4)


值得注意的一件事是 (0,0) 实际上添加到结果 length(rdd)+1 次。

" scala> rdd.aggregate((1,1)) ((x, y) =>(x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1 , x._2 + y._2)) res1: (Int, Int) = (14,9) "