我有和RDD [String]每行包含一个单词.目前规模很小,10-20k线,但目标是将其扩展到数亿行.我遇到的问题是,即使对于这个小数据集,执行map/reduceByKey操作的时间也要长得惊人.我执行以下操作:
val wordcount = filtered.map(w => (w,1)).reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)
对于16780线,在2 GHz i7 8 GB RAM机器上需要12321 ms.我发现有一种叫做聚合的方法可能更有效,因此速度更快:
aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
Run Code Online (Sandbox Code Playgroud)
在我的案例中,我无法弄清楚如何实现这一点.我假设它应该是类似的东西
aggregate(collection.immutable.Map)(??)
Run Code Online (Sandbox Code Playgroud)
所以我的问题是
1)使用aggregate而不是reduceByKey是否有意义
2)如果确实如此,它将如何实施?
我想,最快的将是countByValue:
返回此RDD中每个唯一值的计数作为(值,计数)对的映射.最终组合步骤在主服务器上本地发生,相当于运行单个reduce任务.
用法很简单:
val wordcount = filtered.countByValue
Run Code Online (Sandbox Code Playgroud)
这个方法的实现应该回答第二个问题:)
顺便说一句,reduceByKey不应该花那么长时间.看起来预计算(即过滤)占用了这12秒的大部分时间.为了验证它,persistRDD在计数之前:
val persisted = filtered.persist
val wordcount = persisted.countByValue
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4401 次 |
| 最近记录: |