ab_*_*_sp 50 python lambda aggregate apache-spark rdd
我正在寻找一些更好的解释python中通过spark提供的聚合功能.
我的例子如下(使用Spark 1.2.0版本的pyspark)
sc.parallelize([1,2,3,4]).aggregate(
(0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)
输出:
(10, 4)
Run Code Online (Sandbox Code Playgroud)
我得到的预期结果(10,4)是1+2+3+44个元素的总和.如果我改变传递给聚合函数初始值(1,0)从(0,0) 我得到以下结果
sc.parallelize([1,2,3,4]).aggregate(
(1, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)
输出:
(19, 4)
Run Code Online (Sandbox Code Playgroud)
该值增加9.如果我将其更改为(2,0),则值将转到(28,4)依此类推.
有人可以向我解释这个值的计算方法吗?我预计价值会上升1而不是9,(11,4)我预计会看到(19,4).
gsa*_*ras 87
我对接受的答案并不完全相信,JohnKnight的答案也有所帮助,所以这是我的观点:
首先,让我用自己的话来解释aggregate():
原型:
aggregate(zeroValue,seqOp,combOp)
说明:
aggregate() 允许您获取RDD并生成与原始RDD中存储的类型不同的单个值.
参数:
zeroValue:结果的初始化值,采用所需格式.seqOp:要应用于RDD记录的操作.对分区中的每个记录运行一次.combOp:定义结果对象(每个分区一个)的组合方式.示例:
计算列表的总和和该列表的长度.将结果返回到一对
(sum, length).
在Spark shell中,我首先创建了一个包含4个元素的列表,其中包含2个分区:
listRDD = sc.parallelize([1,2,3,4], 2)
Run Code Online (Sandbox Code Playgroud)
然后我定义了我的seqOp:
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
Run Code Online (Sandbox Code Playgroud)
和我的组合:
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
Run Code Online (Sandbox Code Playgroud)
然后我汇总了:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
Run Code Online (Sandbox Code Playgroud)
如您所见,我为变量提供了描述性名称,但让我进一步解释:
第一个分区有子列表[1,2].我们将seqOp应用于该列表的每个元素,这将产生一个本地结果,一对(sum, length)将仅在第一个分区中本地反映结果的结果.
所以,让我们开始:local_result初始化为zeroValue我们提供的参数aggregate(),即(0,0)并且list_element是列表的第一个元素,即1.结果这是发生的事情:
0 + 1 = 1
0 + 1 = 1
Run Code Online (Sandbox Code Playgroud)
现在,本地结果是(1,1),这意味着,到目前为止,对于第一个分区,在仅处理第一个元素之后,总和为1,长度为1.注意,local_result从(0,0)更新),至(1,1).
1 + 2 = 3
1 + 1 = 2
Run Code Online (Sandbox Code Playgroud)
现在本地结果是(3,2),它将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素.
为第二个分区做同样的事情,得到(7,2).
现在我们将combOp应用于每个局部结果,以便我们可以形成最终的全局结果,如下所示: (3,2) + (7,2) = (10, 4)
'figure'中描述的示例:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
Run Code Online (Sandbox Code Playgroud)
灵感来自这个伟大的榜样.
所以现在如果zeroValue不是(0,0),但是(1,0),人们会期望得到(8 + 4,2 + 2)=(12,4),这并不能解释你的体验.即使我们改变了我的例子的分区数量,我也无法再次获得.
这里的关键是JohnKnight的回答,该回答表明zeroValue它不仅类似于分区数量,而且可能应用的次数超出预期.
maa*_*asg 30
Aggregate允许您随意转换和组合RDD的值.
它使用两个功能:
第一个转换并在本地聚合[U]中添加原始集合[T]的元素,并采用以下形式:(U,T)=> U.您可以将其视为折叠,因此它也需要零对于那个操作.此操作并行地应用于每个分区.
这里是问题的关键所在:这里应该使用的唯一值是还原操作的ZERO值.此操作在每个分区上本地执行,因此,向该零值添加任何内容将添加到结果乘以RDD的分区数.
第二个操作采用前一个操作[U]的结果类型的2个值,并将其组合成一个值.此操作将减少每个分区的部分结果并生成实际总数.
例如:给定一个字符串的RDD:
val rdd:RDD[String] = ???
Run Code Online (Sandbox Code Playgroud)
假设您想要该RDD中字符串长度的总和,那么您可以这样做:
1)第一个操作将字符串转换为size(int)并累积size的值.
val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght`
Run Code Online (Sandbox Code Playgroud)
2)为加法运算提供ZERO(0)
val ZERO = 0
Run Code Online (Sandbox Code Playgroud)
3)将两个整数加在一起的操作:
val add: (Int, Int) => Int = _ + _
Run Code Online (Sandbox Code Playgroud)
把它们放在一起:
rdd.aggregate(ZERO, stringSizeCummulator, add)
Run Code Online (Sandbox Code Playgroud)
那么,为什么ZERO需要呢?当累加器函数应用于分区的第一个元素时,没有运行总计.ZERO在这里使用.
例如.我的RDD是: - 分区1:["跳转","结束"] - 分区2:["the","wall"]
这将导致:
P1:
P2:
减少:添加(P1,P2)= 15
Joh*_*ght 17
我没有足够的声誉点来评论Maasg之前的回答.实际上零值对于seqop应该是"中性"的,这意味着它不会干扰seqop结果,如0朝向add,或1朝*;
你不应该尝试使用非中性值,因为它可能会被随意应用.此行为不仅与分区数量相关联.
我尝试了问题中所述的相同实验.使用1分区,零值应用3次.有2个分区,6次.有3个分区,9次,这将继续下去.
| 归档时间: |
|
| 查看次数: |
34233 次 |
| 最近记录: |