use*_*186 54 scala apache-spark rdd
我是Spark和Scala的新手.我对reduceByKey函数在Spark中的工作方式感到困惑.假设我们有以下代码:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)
map函数是明确的:s是键,它指向行,data.txt而1是值.
但是,我没有得到reduceByKey如何在内部工作?"a"指向钥匙吗?或者,"a"指向"s"吗?那么什么代表a + b?它们是如何填满的?
Jus*_*ony 87
让我们分解为离散的方法和类型.这通常暴露了新开发者的错综复杂:
pairs.reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)
变
pairs.reduceByKey((a: Int, b: Int) => a + b)
Run Code Online (Sandbox Code Playgroud)
并重命名变量使它更明确一些
pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)
Run Code Online (Sandbox Code Playgroud)
因此,我们现在可以看到,我们只是为给定键获取累计值,并将其与该键的下一个值相加.现在,让我们进一步分解,以便我们理解关键部分.所以,让我们更像这样的方法:
pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
//Turn the accumulated value into a true key->value mapping
val accumAsMap = accumulatedValue.toMap
//Try to get the key's current value if we've already encountered it
accumAsMap.get(currentValue._1) match {
//If we have encountered it, then add the new value to the existing value and overwrite the old
case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
//If we have NOT encountered it, then simply add it to the list
case None => currentValue :: accumulatedValue
}
})
Run Code Online (Sandbox Code Playgroud)
因此,您可以看到reduce ByKey采用寻找密钥并跟踪它的样板,因此您不必担心管理该部分.
更深入,更真实,如果你想
所有这一切,这是一个简化的版本,因为这里有一些优化.此操作是关联的,因此火花引擎将首先在本地执行这些减少(通常称为地图侧减少),然后再次在驾驶员处执行.这节省了网络流量; 而不是发送所有数据并执行操作,它可以尽可能小地减少它,然后通过线路发送减少量.
maa*_*asg 47
该reduceByKey功能的一个要求是必须是关联的.为了建立一些关于如何reduceByKey工作的直觉,让我们首先看看关联关联函数如何帮助我们进行并行计算:

正如我们所看到的,我们可以分解原始集合,并通过应用关联函数,我们可以累积总数.连续的情况是微不足道的,我们习惯了它:1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10.
关联性允许我们按顺序和并行使用相同的功能.reduceByKey使用该属性计算RDD的结果,RDD是由分区组成的分布式集合.
请考虑以下示例:
// collection of the form ("key",1),("key,2),...,("key",20) split among 4 partitions
val rdd =sparkContext.parallelize(( (1 to 20).map(x=>("key",x))), 4)
rdd.reduceByKey(_ + _)
rdd.collect()
> Array[(String, Int)] = Array((key,210))
Run Code Online (Sandbox Code Playgroud)
在spark中,数据被分配到分区中.对于下一个图示,(4)分区在左侧,用细线包围.首先,我们在分区中按顺序将函数本地应用于每个分区,但我们并行运行所有4个分区.然后,通过再次应用相同的函数来聚合每个局部计算的结果,最后得到结果.

reduceByKey是aggregateByKey aggregateByKey 2个函数的特化:一个应用于每个分区(顺序),一个应用于每个分区的结果(并行).reduceByKey在两种情况下使用相同的关联函数:对每个分区进行顺序计算,然后将这些结果组合成最终结果,如我们在此处所示.
在你的例子中
val counts = pairs.reduceByKey((a,b) => a+b)
Run Code Online (Sandbox Code Playgroud)
a并且b都是元组中的Int累加器.将使用具有相同值的两个元组并将其值用作并生成新元组.重复此操作,直到每个键只有一个元组._2pairsreduceKeys_2abTuple[String,Int]s
与非Spark(或者,实际上,非并行)不同reduceByKey,其中第一个元素始终是累加器而第二个元素值reduceByKey以分布式方式运行,即每个节点将其元组集合减少为唯一键控元组的集合然后从多个节点中减少元组,直到有一组最终唯一键控的元组.这意味着节点的结果会减少,a并且b表示已经减少的累加器.