我还没有理解聚合函数:
例如,拥有:
val x = List(1,2,3,4,5,6)
val y = x.par.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x,y) => (x._1 + y._1, x._2 + y._2))
Run Code Online (Sandbox Code Playgroud)
结果将是: (21,6)
好吧,我认为这(x,y) => (x._1 + y._1, x._2 + y._2)是为了并行获得结果,例如它将是(1 + 2,1 + 1),依此类推.
但正是这一部分让我感到困惑:
(x, y) => (x._1 + y, x._2 + 1)
Run Code Online (Sandbox Code Playgroud)
为什么x._1 + y?这x._2是0?
提前致谢.
Ras*_*hod 37
首先感谢Diego的回复,它帮助我理解了aggregate()函数.
让我承认我昨晚无法入睡,因为我无法得到聚合()在内部如何运作,今晚我会好好睡一觉:-)
让我们开始理解它
val result = List(1,2,3,4,5,6,7,8,9,10).par.aggregate((0, 0))
(
(x, y) => (x._1 + y, x._2 + 1),
(x,y) =>(x._1 + y._1, x._2 + y._2)
)
Run Code Online (Sandbox Code Playgroud)
结果:(Int,Int)=(55,10)
让我们独立理解所有3个部分:
Aggregate()以accumulators x的初始值开始,这里是(0,0).最初为0的第一个元组x._1用于计算总和,第二个元组x._2用于计算列表中元素的总数.
如果您知道foldLeft如何在scala中工作,那么应该很容易理解这一部分.上面的函数就像我们列表中的foldLeft(1,2,3,4 ... 10)一样.
Iteration# (x._1 + y, x._2 + 1)
1 (0+1, 0+1)
2 (1+2, 1+1)
3 (3+3, 2+1)
4 (6+4, 3+1)
. ....
. ....
10 (45+10, 9+1)
Run Code Online (Sandbox Code Playgroud)
因此,在所有10次迭代后,你将得到结果(55,10).如果你理解这一部分其余部分非常容易,但对我而言,理解所有必需的计算是否完成后最困难的部分是什么才能使用第二部分即compop - 敬请关注:-)
那么第3部分是combOp,它结合了并行化过程中不同线程生成的结果,记得我们在代码中使用'par'来实现list的并行计算:
列表(1,2,3,4,5,6,7,8,9,10).par.aggregate(....)
Apache spark有效地使用聚合函数来进行RDD的并行计算.
假设我们的列表(1,2,3,4,5,6,7,8,9,10)由3个并行线程计算.这里每个线程都在处理部分列表,然后我们的aggregate()combOp将使用以下代码组合每个线程的计算结果:
(x,y) =>(x._1 + y._1, x._2 + y._2)
Run Code Online (Sandbox Code Playgroud)
原始清单:清单(1,2,3,4,5,6,7,8,9,10)
Thread1开始计算部分列表说(1,2,3,4),Thread2计算(5,6,7,8)和Thread3计算部分列表说(9,10)
在计算结束时,Thread-1结果为(10,4),Thread-2结果为(26,4),Thread-3结果为(19,2).
在并行计算结束时,我们将((10,4),(26,4),(19,2))
Iteration# (x._1 + y._1, x._2 + y._2)
1 (0+10, 0+4)
2 (10+26, 4+4)
3 (36+19, 8+2)
Run Code Online (Sandbox Code Playgroud)
这是(55,10).
最后让我重新尝试seqOp作业是计算列表的所有元素和列表总数的总和,而组合函数的工作是组合并行化期间生成的不同部分结果.
我希望上面的解释可以帮助你理解aggregate().
Nat*_*ate 21
从文档:
def aggregate[B](z: ? B)(seqop: (B, A) ? B, combop: (B, B) ? B): B
Run Code Online (Sandbox Code Playgroud)
将运算符的结果聚合到后续元素.
这是折叠和减少的更一般形式.它具有类似的语义,但不要求结果是元素类型的超类型.它按顺序遍历不同分区中的元素,使用seqop更新结果,然后将combop应用于来自不同分区的结果.该操作的实现可以在任意数量的集合分区上操作,因此可以任意次数调用组合.
例如,人们可能想要处理一些元素然后生成一个Set.在这种情况下,seqop会处理一个元素并将其附加到列表中,而combop会将来自不同分区的两个列表连接在一起.初始值z将是空集.
pc.aggregate(Set[Int]())(_ += process(_), _ ++ _)另一个例子是从一系列双打中计算几何平均数(一个通常需要大双打).B累积结果的类型z分区累积结果的初始值 - 这通常是seqop运算符的中性元素(例如列表连接为Nil或求和为0),并且可以多次计算运算符seqop用于在分区内累积结果的组合用于组合来自不同分区的结果的关联运算符
在你的例子中B是一个Tuple2[Int, Int].seqop然后,该方法从列表中获取单个元素,作为范围y,并将聚合更新B为(x._1 + y, x._2 + 1).所以它增加了元组中的第二个元素.这有效地将元素的总和放入元组的第一个元素中,将元素的数量放入元组的第二个元素中.
combop然后,该方法从每个并行执行线程获取结果并将它们组合.通过添加进行组合提供了与按顺序在列表上运行相同的结果.
使用B作为元组可能是令人困惑的一部分.您可以将问题分解为两个子问题,以更好地了解这是在做什么.res0是结果元组中的第一个元素,它是结果元组res1中的第二个元素.
// Sums all elements in parallel.
scala> x.par.aggregate(0)((x, y) => x + y, (x, y) => x + y)
res0: Int = 21
// Counts all elements in parallel.
scala> x.par.aggregate(0)((x, y) => x + 1, (x, y) => x + y)
res1: Int = 6
Run Code Online (Sandbox Code Playgroud)
aggregate需要3个参数:种子值,计算函数和组合函数.
它的作用基本上是将集合拆分为多个线程,使用计算函数计算部分结果,然后使用组合函数组合所有这些部分结果.
据我所知,你的示例函数将返回一对(a,b),其中a是列表中值的总和,b是列表中值的数量.的确,(21,6).
这是如何运作的?种子值是(0,0)对.对于空列表,我们有一个0和0的项目,所以这是正确的.
您的计算函数采用(Int,Int)对x(这是您的部分结果)和Int y(列表中的下一个值).这是你的:
(x, y) => (x._1 + y, x._2 + 1)
Run Code Online (Sandbox Code Playgroud)
实际上,我们想要的结果是将y的左元素(累加器)增加y,并将x(计数器)的右元素增加1.
您的组合函数采用(Int,Int)对x和(Int,Int)对y,它们是来自不同并行计算的两个部分结果,并将它们组合在一起:
(x,y) => (x._1 + y._1, x._2 + y._2)
Run Code Online (Sandbox Code Playgroud)
实际上,我们独立地将对的左侧部分和对的右侧部分相加.
你的困惑来自于第一个函数中的x和y与第二个函数中的x和y不同.在第一个函数中,您有种子值类型的x和集合元素类型的y,并返回x类型的结果.在第二个函数中,您的两个参数都是种子值的相同类型.
希望现在更清楚了!