解释Spark中的聚合功能

ab_*_*_sp 50 python lambda aggregate apache-spark rdd

我正在寻找一些更好的解释python中通过spark提供的聚合功能.

我的例子如下(使用Spark 1.2.0版本的pyspark)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)

输出:

(10, 4)
Run Code Online (Sandbox Code Playgroud)

我得到的预期结果(10,4)1+2+3+44个元素的总和.如果我改变传递给聚合函数初始值(1,0)(0,0) 我得到以下结果

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)

输出:

(19, 4)
Run Code Online (Sandbox Code Playgroud)

该值增加9.如果我将其更改为(2,0),则值将转到(28,4)依此类推.

有人可以向我解释这个值的计算方法吗?我预计价值会上升1而不是9,(11,4)我预计会看到(19,4).

gsa*_*ras 87

我对接受的答案并不完全相信,JohnKnight的答案也有所帮助,所以这是我的观点:

首先,让我用自己的话来解释aggregate():

原型:

aggregate(zeroValue,seqOp,combOp)

说明:

aggregate() 允许您获取RDD并生成与原始RDD中存储的类型不同的单个值.

参数:

  1. zeroValue:结果的初始化值,采用所需格式.
  2. seqOp:要应用于RDD记录的操作.对分区中的每个记录运行一次.
  3. combOp:定义结果对象(每个分区一个)的组合方式.

示例:

计算列表的总和和该列表的长度.将结果返回到一对(sum, length).

在Spark shell中,我首先创建了一个包含4个元素的列表,其中包含2个分区:

listRDD = sc.parallelize([1,2,3,4], 2)
Run Code Online (Sandbox Code Playgroud)

然后我定义了我的seqOp:

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
Run Code Online (Sandbox Code Playgroud)

和我的组合:

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
Run Code Online (Sandbox Code Playgroud)

然后我汇总了:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
Run Code Online (Sandbox Code Playgroud)

如您所见,我为变量提供了描述性名称,但让我进一步解释:

第一个分区有子列表[1,2].我们将seqOp应用于该列表的每个元素,这将产生一个本地结果,一对(sum, length)将仅在第一个分区中本地反映结果的结果.

所以,让我们开始:local_result初始化为zeroValue我们提供的参数aggregate(),即(0,0)并且list_element是列表的第一个元素,即1.结果这是发生的事情:

0 + 1 = 1
0 + 1 = 1
Run Code Online (Sandbox Code Playgroud)

现在,本地结果是(1,1),这意味着,到目前为止,对于第一个分区,在仅处理第一个元素之后,总和为1,长度为1.注意,local_result从(0,0)更新),至(1,1).

1 + 2 = 3
1 + 1 = 2
Run Code Online (Sandbox Code Playgroud)

现在本地结果是(3,2),它将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素.

为第二个分区做同样的事情,得到(7,2).

现在我们将combOp应用于每个局部结果,以便我们可以形成最终的全局结果,如下所示: (3,2) + (7,2) = (10, 4)


'figure'中描述的示例:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)
Run Code Online (Sandbox Code Playgroud)

灵感来自这个伟大的榜样.


所以现在如果zeroValue不是(0,0),但是(1,0),人们会期望得到(8 + 4,2 + 2)=(12,4),这并不能解释你的体验.即使我们改变了我的例子的分区数量,我也无法再次获得.

这里的关键是JohnKnight的回答,该回答表明zeroValue它不仅类似于分区数量,而且可能应用的次数超出预期.


maa*_*asg 30

Aggregate允许您随意转换和组合RDD的值.

它使用两个功能:

第一个转换并在本地聚合[U]中添加原始集合[T]的元素,并采用以下形式:(U,T)=> U.您可以将其视为折叠,因此它也需要零对于那个操作.此操作并行地应用于每个分区.

这里是问题的关键所在:这里应该使用的唯一值是还原操作的ZERO值.此操作在每个分区上本地执行,因此,向该零值添加任何内容将添加到结果乘以RDD的分区数.

第二个操作采用前一个操作[U]的结果类型的2个值,并将其组合成一个值.此操作将减少每个分区的部分结果并生成实际总数.

例如:给定一个字符串的RDD:

val rdd:RDD[String] = ???
Run Code Online (Sandbox Code Playgroud)

假设您想要该RDD中字符串长度的总和,那么您可以这样做:

1)第一个操作将字符串转换为size(int)并累积size的值.

val stringSizeCummulator: (Int, String) => Int  = (total, string) => total + string.lenght`
Run Code Online (Sandbox Code Playgroud)

2)为加法运算提供ZERO(0)

val ZERO = 0
Run Code Online (Sandbox Code Playgroud)

3)将两个整数加在一起的操作:

val add: (Int, Int) => Int = _ + _
Run Code Online (Sandbox Code Playgroud)

把它们放在一起:

rdd.aggregate(ZERO, stringSizeCummulator, add)
Run Code Online (Sandbox Code Playgroud)

那么,为什么ZERO需要呢?当累加器函数应用于分区的第一个元素时,没有运行总计.ZERO在这里使用.

例如.我的RDD是: - 分区1:["跳转","结束"] - 分区2:["the","wall"]

这将导致:

P1:

  1. stringSizeCummulator(ZERO,"Jump")= 4
  2. stringSizeCummulator(4,"over")= 8

P2:

  1. stringSizeCummulator(ZERO,"the")= 3
  2. stringSizeCummulator(3,"wall")= 7

减少:添加(P1,P2)= 15

  • 关于Python的问题,使用scala的anwser?在pyspark中存在这种事情吗? (3认同)

Joh*_*ght 17

我没有足够的声誉点来评论Maasg之前的回答.实际上零值对于seqop应该是"中性"的,这意味着它不会干扰seqop结果,如0朝向add,或1朝*;

你不应该尝试使用非中性值,因为它可能会被随意应用.此行为不仅与分区数量相关联.

我尝试了问题中所述的相同实验.使用1分区,零值应用3次.有2个分区,6次.有3个分区,9次,这将继续下去.