Spark中的低性能reduceByKey()

afa*_*ncy 1 performance scala apache-spark

我正在写一个关于Spark的程序,我只是按键进行聚合.该计划非常简单.我的输入数据只有2GB,在多核服务器(8核,32GB RAM)上运行,设置为local [2].那就是使用两个内核进行并行化.但是,我发现性能非常糟糕.它几乎需要两个小时才能完成.我正在使用KryoSerializer.我想这可能是由Serializer引起的.如何解决这个问题呢?

  val dataPoints = SparkContextManager.textFile(dataLocation)
        .map(x => {
            val delimited = x.split(",")
            (delimited(ColumnIndices.HOME_ID_COLUMN).toLong, 
                delimited(ColumnIndices.USAGE_READING_COLUMN).toDouble)
        })

def process(step: Int): Array[(Long, List[Double])] = {
  val resultRDD = new PairRDDFunctions(dataPoints.map(x =>(x._1, List[Double](x._2))))
  resultRDD.reduceByKey((x, y) => x++y).collect()
}
Run Code Online (Sandbox Code Playgroud)

输出将是:

1, [1, 3, 13, 21, ..., 111] // The size of list is about 4000
2, [24,34,65, 24, ..., 245]
....
Run Code Online (Sandbox Code Playgroud)

Jos*_*sen 9

看起来您正在尝试编写一个Spark作业,该作业将与同一个键关联的值组合在一起.PairRDDFunctions有一个groupByKey执行此操作的操作.Spark的实现groupByKey利用了几个性能优化来创建更少的临时对象,并通过网络缓存更少的数据(因为每个值都不会包含在a中List).

如果导入Spark的隐式转换,请使用

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
Run Code Online (Sandbox Code Playgroud)

那么你不需要手动包装你的映射RDD,PairRDDFunctions以便访问像groupByKey.这不会对性能产生影响,并使大型Spark程序更易于阅读.

使用groupByKey,我认为你的process功能可以改写为

def process(step: Int): Array[(Long, Seq[Double])] = {
  dataPoints.groupByKey().collect()
}
Run Code Online (Sandbox Code Playgroud)

我还考虑提高并行度:两者groupByKeyreduceByKey采用numTasks控制减速器数量的可选参数; 默认情况下,星火仅用8个并行任务groupByKeyreduceByKey." Spark Scala编程指南"以及Scaladoc中对此进行了描述.

  • 不同意,DataBricks编写了一本关于Spark最佳实践的电子书,首先它说避免groupByKey,只需使用reduceByKey,请参阅:http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices /prefer_reducebykey_over_groupbykey.html (6认同)