groupByKey是否比reduceByKey更受欢迎

Gle*_*olt 14 apache-spark rdd

我总是reduceByKey在需要在RDD中对数据进行分组时使用,因为它在对数据进行混洗之前执行地图侧减少,这通常意味着更少的数据被改组,因此我获得了更好的性能.即使地图侧缩减功能收集所有值并且实际上并没有减少数据量,我仍然使用reduceByKey,因为我假设性能reduceByKey永远不会差groupByKey.但是,我想知道这个假设是否正确,或者确实存在groupByKey应该首选的情况?

zer*_*323 15

我相信,有通过忽略了这个问题的其他方面climbageeliasah:

  • 代码可读性
  • 代码可维护性
  • 代码库大小

如果操作不减少数据量,则必须以一种或另一种方式在语义上等效GroupByKey.让我们假设我们RDD[(Int,String)]:

import scala.util.Random
Random.setSeed(1)

def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")

val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))
Run Code Online (Sandbox Code Playgroud)

我们想要连接给定键的所有字符串.随着groupByKey这是很简单的:

rdd.groupByKey.mapValues(_.mkString(""))
Run Code Online (Sandbox Code Playgroud)

天真的解决方案reduceByKey看起来像这样:

rdd.reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)

它简短易懂,但有两个问题:

  • 是非常低效的,因为它String每次都会创建一个新对象*
  • 表明您执行的操作比实际操作更便宜,特别是如果您只分析DAG或调试字符串

为了解决第一个问题,我们需要一个可变数据结构:

import scala.collection.mutable.StringBuilder

rdd.combineByKey[StringBuilder](
    (s: String) => new StringBuilder(s),
    (sb: StringBuilder, s: String) => sb ++= s,
    (sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)
Run Code Online (Sandbox Code Playgroud)

它仍然暗示了其他一些真正发生的事情并且非常冗长,特别是如果在脚本中重复多次.您当然可以提取匿名函数

val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) => 
  sb1.append(sb2)

rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)
Run Code Online (Sandbox Code Playgroud)

但在一天结束时,它仍然意味着需要额外的努力来理解这些代码,增加复杂性并且没有真正的附加价值.我发现特别令人不安的一件事是明确包含可变数据结构.即使Spark几乎处理所有复杂性,也意味着我们不再拥有优雅,引用透明的代码.

我的观点是,如果你真的通过一切手段减少数据量reduceByKey.否则,你会使代码更难编写,更难以分析并获得任何回报.

注意:

这个答案主要针对Scala RDDAPI.当前的Python实现与其JVM对应物完全不同,并且包括优化,这些优化reduceByKeygroupBy类似操作的情况下提供了优于天真实现的显着优势.

对于DatasetAPI,请参阅DataFrame/Dataset groupBy行为/优化.


*有关令人信服的示例,请参阅Scala与Python的Spark性能


Mik*_*ark 7

reduceByKey并且groupByKey都使用combineByKey不同的组合/合并语义.

我看到的关键区别是groupByKey将flag(mapSideCombine=false)传递给shuffle引擎.从问题SPARK-772来看,这是一个提示,当数据大小不会改变时,shuffle引擎不能运行mapside combiner.

所以我想说如果你试图reduceByKey用来复制groupByKey,你可能会看到轻微的性能损失.