Spark 性能 - 如何并行化大循环?

wdz*_*wdz 2 parallel-processing performance scala apache-spark

我有一个包含 8000 个循环的 Spark 应用程序,它在 5 个节点的集群上运行。每个节点有 125GB 内存和 32 个内核。相关代码如下所示:

for (m <- 0 until deviceArray.size) { // there are 1000 device 
  var id = deviceArray(m)

  for (t <- 1 to timePatterns) { // there are 8 time patterns
     var hrpvData = get24HoursPVF(dataDF, id, t).cache()

  var hrpvDataZI = hrpvData.zipWithIndex

  var clustersLSD = runKMeans(hrpvData, numClusters, numIterations)

  var clusterPVPred = hrpvData.map(x => clustersLSD.predict(x))
  var clusterPVMap = hrpvDataZI.zip(clusterPVPred)

  var pvhgmRDD = clusterPVMap.map{r => (r._2, r._1._2)}.groupByKey

  var arrHGinfo = pvhgmRDD.collect 

  // Post process data 
  // .....

  hrpvData.unpersist()
  }
}
Run Code Online (Sandbox Code Playgroud)

函数调用get24HoursPVF()为 k-means 准备特征向量,大约需要 40 秒。每个循环大约需要 50 秒才能完成使用集群。我的数据大小为 2 到 3 GB(从表中读取)。给定 8000 个循环,运行这个 Spark 应用程序的总时间是不可接受的(8000x50s)。

既然每个设备都是独立的,那么有没有办法并行8000次迭代呢?或者如何利用集群来解决总运行时间长的问题?Scala Future 将无法工作,因为它只是几乎同时提交作业,但 Spark 不会同时运行这些作业。

Ewa*_*ith 5

除了 for 循环之外,您的代码中还有 2 个最慢的 Spark API 调用 -groupByKeycollect.

groupByKey 几乎不应该使用,而是查看reduceByKey,查看此Databricks 博客了解更多详细信息。

collect 将该 RDD 中的所有数据传输到驱动程序节点上的数组,除非这是少量数据,否则会对性能产生相当大的影响。

在 for 循环中,我不是特别熟悉您要执行的操作,但是在

var hrpvData = get24HoursPVF(dataDF, id, t).cache()
Run Code Online (Sandbox Code Playgroud)

您正在为每个 id 和 t 值构建和缓存一个新的数据框。我不确定为什么您不能在开始时只构建一个包含 id 和 t 的每个变体的单个数据帧,然后在整个数据帧上运行 zipWithIndex、map 等?

  • 如果您需要 group 而不是 group 后跟 some-reduction 那么 groupBy 键也别无选择。如果您的意思是reduceByKey_,则_Don't use groupByKey_ 比_Avoid GroupByKey_ 更重要。而且 groupByKey 没有无随机播放的替代方案 - 它只有 -ess-to-shuffle 替代方案。 (2认同)