createCombiner,mergeValue,mergeCombiner如何在Spark中使用CombineByKey(使用Scala)

Ran*_*icx 19 apache-spark

我试图了解每个步骤的combineByKeys工作原理.

有人可以帮我理解下面的RDD吗?

val rdd = sc.parallelize(List(
  ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),("B", 4), 
  ("B", 10), ("B", 11), ("B", 20), ("B", 25),("C", 32), ("C", 91),
   ("C", 122), ("C", 3), ("C", 55)), 2)

rdd.combineByKey(
    (x:Int) => (x, 1),
    (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
    (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
Run Code Online (Sandbox Code Playgroud)

Jus*_*ony 55

首先,让我们打破这个过程:

首先,createCombiner如果找不到密钥,则在分区上首次遇到密钥时创建初始值(组合器)--> (firstValueEncountered, 1).因此,这仅仅是使用第一个值和1的密钥计数器初始化元组.

然后,mergeValue仅在已为此分区上的已找到密钥创建组合器(在我们的示例中为元组)时才会触发--> (existingTuple._1 + subSequentValue, existingTuple._2 + 1).这会将现有元组的值(在第一个插槽中)与新遇到的值相加,并获取现有元组的计数器(在第二个插槽中)并递增它.所以,我们是

然后,mergeCombiner获取在每个分区上创建的组合器(元组)并将它们合并在一起--> (tupleFromPartition._1 + tupleFromPartition2._1, tupleFromPartition1._2 + tupleFromPartition2._2).这只是将每个元组的值和计数器一起添加到一个元组中.

然后,让我们将您的数据子集拆分为分区并查看其中的操作:

("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)
Run Code Online (Sandbox Code Playgroud)

分区1

A=3 --> createCombiner(3) ==> accum[A] = (3, 1)
A=9 --> mergeValue(accum[A], 9) ==> accum[A] = (3 + 9, 1 + 1)
B=11 --> createCombiner(11) ==> accum[B] = (11, 1)
Run Code Online (Sandbox Code Playgroud)

分区2

A=12 --> createCombiner(12) ==> accum[A] = (12, 1)
B=4 --> createCombiner(4) ==> accum[B] = (4, 1)
B=10 --> mergeValue(accum[B], 10) ==> accum[B] = (4 + 10, 1 + 1)
Run Code Online (Sandbox Code Playgroud)

将分区合并在一起

A ==> mergeCombiner((12, 2), (12, 1)) ==> (12 + 12, 2 + 1)
B ==> mergeCombiner((11, 1), (14, 2)) ==> (11 + 14, 1 + 2)
Run Code Online (Sandbox Code Playgroud)

所以,你应该得到一个像这样的数组:

Array((A, (24, 3)), (B, (25, 3)))
Run Code Online (Sandbox Code Playgroud)

  • 我假设一个随机分区器.如果它是一个已排序的分区程序,那么它应该如您所愿 (3认同)
  • 真棒插图贾斯汀..! (2认同)