何时使用 countByValue 何时使用 map().reduceByKey()

Kay*_*ayV 5 scala word-count apache-spark rdd

我是 Spark 和 Scala 的新手,正在研究一个简单的 wordCount 示例。

因此,我使用 countByValue 如下:

val words = lines.flatMap(x => x.split("\\W+")).map(x => x.toLowerCase())
val wordCount = words.countByValue();
Run Code Online (Sandbox Code Playgroud)

这工作正常。

同样的事情可以实现:

val words = lines.flatMap(x => x.split("\\W+")).map(x => x.toLowerCase())
val wordCounts = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
val sortedWords = wordCounts.map(x => (x._2, x._1)).sortByKey()
Run Code Online (Sandbox Code Playgroud)

这也很好用。

现在,我的问题是什么时候使用哪些方法?哪一个比另一个更受欢迎?

Dar*_*ero 9

至少在 PySpark 中,它们是不同的东西。

countByKey使用 实现reduce,这意味着驱动程序将收集分区的部分结果并自行进行合并。如果你的结果很大,那么驱动就得合并大量的大词典,这会让驱动抓狂。

reduceByKey shuffle key 到不同的executor,在每个worker上做reduce,所以数据量大的时候更有利。

总之,当你的数据量较大,使用mapreduceByKeycollect会使你的驾驶更快乐。如果你的数据很小,countByKey会引入更少的网络流量(少一个阶段)。


the*_*tom 7

这里的例子 - 不是文字,而是数字:

val n = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
val n2 = n.countByValue
Run Code Online (Sandbox Code Playgroud)

返回本地地图:

n: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at command-3737881976236428:1
n2: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)
Run Code Online (Sandbox Code Playgroud)

这是关键的区别。

如果您想要开箱即用的地图,那么这就是您要走的路。

另外,关键是reduce是隐含的,不能被影响,也不需要像reduceByKey那样提供。

当数据量很大时,reduceByKey 有优先权。Map 被完整加载到驱动程序内存中。


Kay*_*ayV 6

添加到上述所有答案,这是我进一步发现的:

  1. CountByValue 返回一个不能以分布式方式使用的映射。

  2. ReduceByKey 返回一个可以以分布式方式进一步使用的 rdd。