Ana*_*eep 3 scala apache-spark
我试图通过combineByKey获得countByKey的相同结果.
scala> ordersMap.take(5).foreach(println)
(CLOSED,1)
(PENDING_PAYMENT,2)
(COMPLETE,3)
(CLOSED,4)
(COMPLETE,5)
Run Code Online (Sandbox Code Playgroud)
这是我的输入,我想使用combineByKey来获取countByKey的输出.
countByKey的输出(正确)
PAYMENT_REVIEW 729
CLOSED 7556
SUSPECTED_FRAUD 1558
PROCESSING 8275
COMPLETE 22899
PENDING 7610
PENDING_PAYMENT 15030
ON_HOLD 3798
CANCELED 1428
Run Code Online (Sandbox Code Playgroud)
我用过combineByKey
val combine = ordersMap.combineByKey( x => 1 , (a:Int ,v) => a +1 , (a : Int,v : Int) => a +1 )
Run Code Online (Sandbox Code Playgroud)
但我得到了意想不到的结果,我不知道为什么.
来自combineByKey的结果
(PENDING_PAYMENT,7600)
(CLOSED,3878)
(CANCELED,699)
(PAYMENT_REVIEW,368)
(PENDING,3764)
(ON_HOLD,1896)
(PROCESSING,4100)
(SUSPECTED_FRAUD,773)
(COMPLETE,11372)
Run Code Online (Sandbox Code Playgroud)
谢谢
那是因为你正在应用错误的功能.传递给的最后一个函数combineByKey需要组合累加器类型(C)的两个值,这两个值可能在不同的执行程序上计算.这就是函数名称被调用的原因mergeCombiners.
文档:
combineByKey[C](createCombiner: (V) ? C,
mergeValue: (C, V) ? C,
mergeCombiners: (C, C) ? C): RDD[(K, C)]
Run Code Online (Sandbox Code Playgroud)
你应该如何申请mergeCombiners:
val combine =
ordersMap.combineByKey(_ => 1 , (a: Int, _) => a + 1, (a: Int, v: Int) => a + v)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
974 次 |
| 最近记录: |