如何使用combineByKey?

Ana*_*eep 3 scala apache-spark

我试图通过combineByKey获得countByKey的相同结果.

scala> ordersMap.take(5).foreach(println)
(CLOSED,1)
(PENDING_PAYMENT,2)
(COMPLETE,3)
(CLOSED,4)
(COMPLETE,5)
Run Code Online (Sandbox Code Playgroud)

这是我的输入,我想使用combineByKey来获取countByKey的输出.

countByKey的输出(正确)

PAYMENT_REVIEW 729
CLOSED 7556
SUSPECTED_FRAUD 1558
PROCESSING 8275
COMPLETE 22899
PENDING 7610
PENDING_PAYMENT 15030
ON_HOLD 3798
CANCELED 1428
Run Code Online (Sandbox Code Playgroud)

我用过combineByKey

val combine = ordersMap.combineByKey(  x => 1 , (a:Int ,v) => a +1 , (a : Int,v : Int) => a +1  )
Run Code Online (Sandbox Code Playgroud)

但我得到了意想不到的结果,我不知道为什么.

来自combineByKey的结果

(PENDING_PAYMENT,7600)
(CLOSED,3878)
(CANCELED,699)
(PAYMENT_REVIEW,368)
(PENDING,3764)
(ON_HOLD,1896)
(PROCESSING,4100)
(SUSPECTED_FRAUD,773)
(COMPLETE,11372)
Run Code Online (Sandbox Code Playgroud)

谢谢

Yuv*_*kov 5

那是因为你正在应用错误的功能.传递给的最后一个函数combineByKey需要组合累加器类型(C)的两个值,这两个值可能在不同的执行程序上计算.这就是函数名称被调用的原因mergeCombiners.

文档:

combineByKey[C](createCombiner: (V) ? C, 
                mergeValue: (C, V) ? C, 
                mergeCombiners: (C, C) ? C): RDD[(K, C)]
Run Code Online (Sandbox Code Playgroud)

你应该如何申请mergeCombiners:

val combine = 
  ordersMap.combineByKey(_ => 1 , (a: Int, _) => a + 1, (a: Int, v: Int) => a + v)
Run Code Online (Sandbox Code Playgroud)