Edw*_*Guo 40 distributed-computing apache-spark
假设我在3个节点上有一个分发系统,我的数据分布在这些节点中.例如,我有一个test.csv文件,它存在于所有3个节点上,它包含2列:
**row | id, c.**
---------------
row1 | k1 , c1
row2 | k1 , c2
row3 | k1 , c3
row4 | k2 , c4
row5 | k2 , c5
row6 | k2 , c6
row7 | k3 , c7
row8 | k3 , c8
row9 | k3 , c9
row10 | k4 , c10
row11 | k4 , c11
row12 | k4 , c12
Run Code Online (Sandbox Code Playgroud)
然后我使用SparkContext.textFile将文件读出为rdd等等.据我所知,每个spark worker节点都会读取文件中的一部分.所以现在让我们说每个节点都会存储:
我的问题是,假设我想对这些数据进行计算,我需要将一个键组合在一起,因此键值对将是[k1 [{k1 c1} {k1 c2} {k1 c3}]]..
等等.
有一个函数叫做groupByKey()
使用非常昂贵,aggregateByKey()
建议使用.所以我不知道怎么做groupByKey()
和aggregateByKey()
引擎盖下的作品?有人可以使用我上面提供的例子来解释吗?在洗牌之后,每个节点上的行都在哪里?
Ant*_*oni 68
aggregateByKey()与reduceByKey完全不同.会发生什么是reduceByKey是aggregateByKey的特例.
aggregateByKey()将组合特定键的值,此类组合的结果可以是您指定的任何对象.您必须指定值如何在一个分区(在同一节点中执行)内组合("添加")以及如何组合来自不同分区(可能在不同节点中)的结果.reduceByKey是一种特殊情况,在某种意义上,组合的结果(例如,和)与值的类型相同,并且当从不同分区组合时的操作也与组合内部的值时的操作相同.划分.
一个例子:想象一下你有一对配对列表.你并行化它:
val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
Run Code Online (Sandbox Code Playgroud)
现在你想用键"组合"它们产生一个总和.在这种情况下,reduceByKey和aggregateByKey是相同的:
val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))
//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))
Run Code Online (Sandbox Code Playgroud)
现在,假设您希望聚合是一组值,这是一个不同的类型,值是整数(整数的总和也是整数):
import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])] =Array((b,Set(7)), (a,Set(1, 5, 3)))
Run Code Online (Sandbox Code Playgroud)
aar*_*man 51
aggregateByKey()
几乎完全相同reduceByKey()
(都combineByKey()
在幕后调用),除了你给出一个起始值aggregateByKey()
.大多数人都熟悉reduceByKey()
,所以我会在解释中使用它.
原因reduceByKey()
要好得多,因为它使用了称为组合器的MapReduce功能.任何类似+
或*
可以以这种方式使用的函数,因为它被调用的元素的顺序无关紧要.这允许Spark使用相同的密钥开始"减少"值,即使它们并非全部在同一分区中.
另一方面,groupByKey()
由于您编写了一个具有Iterable的函数,因此您可以将所有元素拉入数组中,从而为您提供更多功能.然而,它是低效的,因为它可以工作,整套(K,V,)
对必须在一个分区中.
在reduce类型操作上移动数据的步骤通常称为shuffle,在最简单的级别上,数据被分区到每个节点(通常使用散列分区器),然后在每个节点上进行排序.
归档时间: |
|
查看次数: |
46486 次 |
最近记录: |