Spark RDD.aggregate vs RDD.reduceByKey?

lan*_*lde 3 apache-spark

我有和RDD [String]每行包含一个单词.目前规模很小,10-20k线,但目标是将其扩展到数亿行.我遇到的问题是,即使对于这个小数据集,执行map/reduceByKey操作的时间也要长得惊人.我执行以下操作:

val wordcount = filtered.map(w => (w,1)).reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)

对于16780线,在2 GHz i7 8 GB RAM机器上需要12321 ms.我发现有一种叫做聚合的方法可能更有效,因此速度更快:

aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
Run Code Online (Sandbox Code Playgroud)

在我的案例中,我无法弄清楚如何实现这一点.我假设它应该是类似的东西

aggregate(collection.immutable.Map)(??)
Run Code Online (Sandbox Code Playgroud)

所以我的问题是

1)使用aggregate而不是reduceByKey是否有意义

2)如果确实如此,它将如何实施?

Wil*_*ire 5

我想,最快的将是countByValue:

返回此RDD中每个唯一值的计数作为(值,计数)对的映射.最终组合步骤在主服务器上本地发生,相当于运行单个reduce任务.

用法很简单:

val wordcount = filtered.countByValue
Run Code Online (Sandbox Code Playgroud)

这个方法的实现应该回答第二个问题:)

顺便说一句,reduceByKey不应该花那么长时间.看起来预计算(即过滤)占用了这12秒的大部分时间.为了验证它,persistRDD在计数之前:

val persisted = filtered.persist
val wordcount = persisted.countByValue
Run Code Online (Sandbox Code Playgroud)