afa*_*ncy 1 performance scala apache-spark
我正在写一个关于Spark的程序,我只是按键进行聚合.该计划非常简单.我的输入数据只有2GB,在多核服务器(8核,32GB RAM)上运行,设置为local [2].那就是使用两个内核进行并行化.但是,我发现性能非常糟糕.它几乎需要两个小时才能完成.我正在使用KryoSerializer.我想这可能是由Serializer引起的.如何解决这个问题呢?
val dataPoints = SparkContextManager.textFile(dataLocation)
.map(x => {
val delimited = x.split(",")
(delimited(ColumnIndices.HOME_ID_COLUMN).toLong,
delimited(ColumnIndices.USAGE_READING_COLUMN).toDouble)
})
def process(step: Int): Array[(Long, List[Double])] = {
val resultRDD = new PairRDDFunctions(dataPoints.map(x =>(x._1, List[Double](x._2))))
resultRDD.reduceByKey((x, y) => x++y).collect()
}
Run Code Online (Sandbox Code Playgroud)
输出将是:
1, [1, 3, 13, 21, ..., 111] // The size of list is about 4000
2, [24,34,65, 24, ..., 245]
....
Run Code Online (Sandbox Code Playgroud)
看起来您正在尝试编写一个Spark作业,该作业将与同一个键关联的值组合在一起.PairRDDFunctions有一个groupByKey
执行此操作的操作.Spark的实现groupByKey
利用了几个性能优化来创建更少的临时对象,并通过网络缓存更少的数据(因为每个值都不会包含在a中List
).
如果导入Spark的隐式转换,请使用
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
Run Code Online (Sandbox Code Playgroud)
那么你不需要手动包装你的映射RDD,PairRDDFunctions
以便访问像groupByKey
.这不会对性能产生影响,并使大型Spark程序更易于阅读.
使用groupByKey
,我认为你的process
功能可以改写为
def process(step: Int): Array[(Long, Seq[Double])] = {
dataPoints.groupByKey().collect()
}
Run Code Online (Sandbox Code Playgroud)
我还考虑提高并行度:两者groupByKey
并reduceByKey
采用numTasks
控制减速器数量的可选参数; 默认情况下,星火仅用8个并行任务groupByKey
和reduceByKey
." Spark Scala编程指南"以及Scaladoc中对此进行了描述.
归档时间: |
|
查看次数: |
8801 次 |
最近记录: |