reduceByKey与groupByKey之间的Spark差异与aggregateByKey vs combineByKey之间的差异

Aru*_*n S 53 apache-spark

任何人都可以解释reducebykey,groupbykey,aggregatebykey和combinebykey之间的区别吗?我已经阅读了有关这方面的文件,但无法理解确切的区别?

如果你能用例子解释它会很棒.

Vam*_*ddy 59

groupByKey:

句法:

sparkContext.textFile("hdfs://")
                    .flatMap(line => line.split(" ") )
                    .map(word => (word,1))
                    .groupByKey()
                    .map((x,y) => (x,sum(y)))
Run Code Online (Sandbox Code Playgroud)

groupByKey可能会导致磁盘无问题,因为数据通过网络发送并在reduce worker上收集.

reduceByKey:

句法:

sparkContext.textFile("hdfs://")
                    .flatMap(line => line.split(" "))
                    .map(word => (word,1))
                    .reduceByKey((x,y)=> (x+y))
Run Code Online (Sandbox Code Playgroud)

数据在每个分区组合,每个分区的一个密钥只有一个输出通过网络发送.reduceByKey需要将所有值组合成具有完全相同类型的另一个值.

aggregateByKey:

与reduceByKey相同,后者采用初始值.

3个参数作为输入i.初始值ii.组合逻辑iii.序列操作逻辑

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
    val data = sc.parallelize(keysWithValuesList)
    //Create key value pairs
    val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
    val initialCount = 0;
    val addToCounts = (n: Int, v: String) => n + 1
    val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
    val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
Run Code Online (Sandbox Code Playgroud)

`

输出: 按键总和结果栏 - > 3 foo - > 5

combineByKey:

3个参数作为输入

  1. 初始值:与aggregateByKey不同,不需要总是传递常量,我们可以传递一个函数来返回一个新值.
  2. 合并功能
  3. 结合功能

示例: `

val result = rdd.combineByKey(
                        (v) => (v,1),
                        ( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
                        ( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2)) 
                        ).map( { case (k,v) => (k,v._1/v._2.toDouble) })
        result.collect.foreach(println)
Run Code Online (Sandbox Code Playgroud)

`

reduceByKey,aggregateByKey,combineByKey 优先于 groupByKey

参考: 避免groupByKey


yog*_*oga 15

虽然reducebykey和groupbykey都会产生相同的答案,但reduceByKey示例在大型数据集上的效果要好得多.这是因为Spark知道它可以在混洗数据之前将输出与每个分区上的公共密钥组合在一起.

另一方面,当调用groupByKey时 - 所有的键值对都被改组了.这是通过网络传输的大量不确定数据.

有关详细信息,请查看以下链接

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

  • 在任何情况下都应该使用groupByKey吗?如果该功能不是关联功能,该怎么办? (2认同)

Bal*_*raj 12

  • groupByKey()只是根据键对数据集进行分组.当RDD尚未分区时,它将导致数据混乱.
  • reduceByKey()就像分组+聚合一样.我们可以说reduceBykey()等于dataset.group(...).reduce(...).与之不同的是,它会减少数据量groupByKey().
  • aggregateByKey()在逻辑上与reduceByKey()相同,但它允许您以不同的类型返回结果.换句话说,它允许您将输入作为类型x,将聚合结果作为类型y.例如(1,2),(1,4)作为输入,(1,"6")作为输出.它还将采用零值,将在每个键的开头应用.

注意: 一个相似之处是它们都是广泛的操作.


Raj*_*hra 5

ReduceByKey reduceByKey(func, [numTasks]) -

数据被合并,以便在每个分区中每个键至少有一个值。然后 shuffle 发生,它通过网络发送到某个特定的 executor 以执行一些操作,例如 reduce。

GroupByKey -groupByKey([numTasks])

它不合并键的值,而是直接进行混洗过程,这里大量数据被发送到每个分区,几乎与初始数据相同。

并且每个键的值的合并是在 shuffle 之后完成的。这里大量数据存储在最终工作节点上,因此导致内存不足问题。

AggregateByKey -aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 与 reduceByKey 类似,但您可以在执行聚合时提供初始值。

用于 reduceByKey

  • reduceByKey 当我们在大数据集上运行时可以使用。

  • reduceByKey 当输入和输出值类型相同时 aggregateByKey

此外,它建议不要使用groupByKey并且更喜欢reduceByKey. 有关详细信息,请参阅此处

您还可以参考此问题以更详细地了解如何reduceByKeyaggregateByKey


Sai*_*tty 5

尽管它们两者都将获得相同的结果,但是两个功能的性能却存在显着差异。reduceByKey()与相比,在更大的数据集上效果更好groupByKey()

在中reduceByKey(),在重新reduceByKey()整理数据之前,将合并同一台计算机上具有相同密钥的对(通过使用传递给的函数)。然后再次调用该函数以减少每个分区中的所有值以产生一个最终结果。

在中groupByKey(),所有键值对均乱码。这是通过网络传输的许多不必要的数据。