我总是reduceByKey在需要在RDD中对数据进行分组时使用,因为它在对数据进行混洗之前执行地图侧减少,这通常意味着更少的数据被改组,因此我获得了更好的性能.即使地图侧缩减功能收集所有值并且实际上并没有减少数据量,我仍然使用reduceByKey,因为我假设性能reduceByKey永远不会差groupByKey.但是,我想知道这个假设是否正确,或者确实存在groupByKey应该首选的情况?
zer*_*323 15
我相信,有通过忽略了这个问题的其他方面climbage和eliasah:
如果操作不减少数据量,则必须以一种或另一种方式在语义上等效GroupByKey.让我们假设我们RDD[(Int,String)]:
import scala.util.Random
Random.setSeed(1)
def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")
val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))
Run Code Online (Sandbox Code Playgroud)
我们想要连接给定键的所有字符串.随着groupByKey这是很简单的:
rdd.groupByKey.mapValues(_.mkString(""))
Run Code Online (Sandbox Code Playgroud)
天真的解决方案reduceByKey看起来像这样:
rdd.reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)
它简短易懂,但有两个问题:
String每次都会创建一个新对象*为了解决第一个问题,我们需要一个可变数据结构:
import scala.collection.mutable.StringBuilder
rdd.combineByKey[StringBuilder](
(s: String) => new StringBuilder(s),
(sb: StringBuilder, s: String) => sb ++= s,
(sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)
Run Code Online (Sandbox Code Playgroud)
它仍然暗示了其他一些真正发生的事情并且非常冗长,特别是如果在脚本中重复多次.您当然可以提取匿名函数
val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) =>
sb1.append(sb2)
rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)
Run Code Online (Sandbox Code Playgroud)
但在一天结束时,它仍然意味着需要额外的努力来理解这些代码,增加复杂性并且没有真正的附加价值.我发现特别令人不安的一件事是明确包含可变数据结构.即使Spark几乎处理所有复杂性,也意味着我们不再拥有优雅,引用透明的代码.
我的观点是,如果你真的通过一切手段减少数据量reduceByKey.否则,你会使代码更难编写,更难以分析并获得任何回报.
注意:
这个答案主要针对Scala RDDAPI.当前的Python实现与其JVM对应物完全不同,并且包括优化,这些优化reduceByKey在groupBy类似操作的情况下提供了优于天真实现的显着优势.
对于DatasetAPI,请参阅DataFrame/Dataset groupBy行为/优化.
*有关令人信服的示例,请参阅Scala与Python的Spark性能
| 归档时间: |
|
| 查看次数: |
11962 次 |
| 最近记录: |