在Spark中加入并行化很好吗?

pal*_*ako 15 apache-spark

我在一个小于400MB的非常小的数据集上运行一个相当小的Spark程序,其中包含一些map和reduceByKey操作.

在某些时候,我有一个我要排序的元组的RDD,我调用sortByKey.这是我程序中最慢的部分.其他一切似乎几乎立即运行,但这需要20秒.

问题是,我的笔记本电脑以及AWS m3.large机器群集需要20秒.我尝试过1,2和3个奴隶,执行时间的差异非常小.Ganglia和spark web控制台表明CPU和内存正在被用于所有从站的最大容量,所以我认为配置是可以的.

我还在我预期之前发现了执行问题,但后来我读到了这个线程,它指向Sp​​ark中的一个未解决的问题.我不认为这完全相关.

它是sortByKey固有的慢,无论我添加多少节点,它将决定我的程序的最短执行时间?希望不是,而且我只是做错了什么并且可以修复.

编辑

事实证明,我所看到的与我发布的链接有关.sortByKey恰好是第一个动作(记录为转换),看起来好像程序在排序时很慢,但实际排序速度非常快.问题出在先前的连接操作中.

我说的所有内容都适用于通过连接更改排序.当我添加更多节点(或numTask到连接函数)时,为什么执行时间不会下降,为什么它甚至不比普通的SQL连接更好?我之前发现其他人有这个问题,但除了建议调整序列化之外没有答案,我真的不认为是我的情况.

Eri*_*oom 17

连接本质上是一个繁重的操作,因为具有相同键的值必须移动到同一台机器(网络混洗).添加更多节点只会增加额外的IO开销.

我能想到两件事:

选项1

如果要使用较小的数据集加入大型数据集,则可以通过广播较小的数据集来获得回报:

val large = sc.textFile("large.txt").map(...) 
val smaller = sc.textFile("smaller.txt").collect().toMap() 
val bc = sc.broadcast(smaller)
Run Code Online (Sandbox Code Playgroud)

然后做一个'手动加入':

large.map(x => (x.value, bc.value(x.value)))
Run Code Online (Sandbox Code Playgroud)

这在Advanced Spark演示中有更详细的描述.

选项2

您可以使用与大数据集相同的分区来重新分区小数据集(即确保类似的键位于同一台计算机上).因此,调整小集的分区以匹配大集的分区.

这将仅触发小套装的随机播放.一旦分区正确,连接应该相当快,因为​​它将在每个群集节点上本地运行.