谁可以在Spark中对"combineByKey"做出明确的解释?

Bla*_*mba 5 python apache-spark

我正在学习火花,但我无法理解这个功能combineByKey.

>>> data = sc.parallelize([("A",1),("A",2),("B",1),("B",2),("C",1)] )
>>> data.combineByKey(lambda v : str(v)+"_", lambda c, v : c+"@"+str(v), lambda c1, c2 : c1+c2).collect()
Run Code Online (Sandbox Code Playgroud)

输出是:

[('A', '1_2_'), ('C', '1_'), ('B', '1_2_')]
Run Code Online (Sandbox Code Playgroud)

首先,我很困惑:@第二步在哪里lambda c, v : c+"@"+v?我无法@从结果中找到.

其次,我阅读了函数描述combineByKey,但我对算法流程感到困惑.

eli*_*sah 9

groupByKey调用不会尝试合并/组合值,因此这是一项昂贵的操作.

因此,combineByKey呼叫就是这样的优化.当使用combineByKey值在每个分区合并为一个值时,每个分区值将合并为单个值.值得注意的是,组合值的类型不必与原始值的类型匹配,并且通常不会与原始值的类型相匹配.该combineByKey函数将3个函数作为参数:

  1. 一个创建组合器的函数.在aggregateByKey函数中,第一个参数只是一个初始零值.在combineByKey我们提供了能接受我们的电流值作为参数,并返回将与其他值合并我们的新值的函数.

  2. 第二个函数是一个合并函数,它接受一个值并将其合并/组合到先前收集的值中.

  3. 第三个函数将合并的值组合在一起.基本上,此函数采用在分区级别生成的新值并将它们组合,直到我们最终得到一个奇异值.

换句话说,要理解combineByKey,考虑它如何处理它处理的每个元素是有用的.作为combineByKey通过元件分区中的推移,每个元件或者具有它之前还没有看到一个键或具有相同的密钥作为前一个元素.

如果它是一个新元素,则combineByKey使用我们提供的函数(称为)createCombiner()为该键创建累加器的初始值.重要的是要注意,这是在第一次在每个分区中找到密钥时发生的,而不是仅在第一次在RDD中找到密钥时发生.

如果它是我们在处理该分区时看到的值,则它将使用提供的函数,mergeValue()以及该键的累加器的当前值和新值.

由于每个分区都是独立处理的,因此我们可以为同一个密钥设置多个累加器.当我们合并每个分区的结果时,如果两个或多个分区具有相同键的累加器,我们使用用户提供的mergeCombiners()函数合并累加器.

参考文献: