我试图了解每个步骤的combineByKeys
工作原理.
有人可以帮我理解下面的RDD吗?
val rdd = sc.parallelize(List(
("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),("B", 4),
("B", 10), ("B", 11), ("B", 20), ("B", 25),("C", 32), ("C", 91),
("C", 122), ("C", 3), ("C", 55)), 2)
rdd.combineByKey(
(x:Int) => (x, 1),
(acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
Run Code Online (Sandbox Code Playgroud)
Jus*_*ony 55
首先,让我们打破这个过程:
首先,createCombiner
如果找不到密钥,则在分区上首次遇到密钥时创建初始值(组合器)-->
(firstValueEncountered, 1)
.因此,这仅仅是使用第一个值和1的密钥计数器初始化元组.
然后,mergeValue
仅在已为此分区上的已找到密钥创建组合器(在我们的示例中为元组)时才会触发-->
(existingTuple._1 + subSequentValue, existingTuple._2 + 1)
.这会将现有元组的值(在第一个插槽中)与新遇到的值相加,并获取现有元组的计数器(在第二个插槽中)并递增它.所以,我们是
然后,mergeCombiner
获取在每个分区上创建的组合器(元组)并将它们合并在一起-->
(tupleFromPartition._1 + tupleFromPartition2._1, tupleFromPartition1._2 + tupleFromPartition2._2)
.这只是将每个元组的值和计数器一起添加到一个元组中.
然后,让我们将您的数据子集拆分为分区并查看其中的操作:
("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)
Run Code Online (Sandbox Code Playgroud)
A=3 --> createCombiner(3) ==> accum[A] = (3, 1)
A=9 --> mergeValue(accum[A], 9) ==> accum[A] = (3 + 9, 1 + 1)
B=11 --> createCombiner(11) ==> accum[B] = (11, 1)
Run Code Online (Sandbox Code Playgroud)
A=12 --> createCombiner(12) ==> accum[A] = (12, 1)
B=4 --> createCombiner(4) ==> accum[B] = (4, 1)
B=10 --> mergeValue(accum[B], 10) ==> accum[B] = (4 + 10, 1 + 1)
Run Code Online (Sandbox Code Playgroud)
A ==> mergeCombiner((12, 2), (12, 1)) ==> (12 + 12, 2 + 1)
B ==> mergeCombiner((11, 1), (14, 2)) ==> (11 + 14, 1 + 2)
Run Code Online (Sandbox Code Playgroud)
所以,你应该得到一个像这样的数组:
Array((A, (24, 3)), (B, (25, 3)))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
9653 次 |
最近记录: |