wdz*_*wdz 2 parallel-processing performance scala apache-spark
我有一个包含 8000 个循环的 Spark 应用程序,它在 5 个节点的集群上运行。每个节点有 125GB 内存和 32 个内核。相关代码如下所示:
for (m <- 0 until deviceArray.size) { // there are 1000 device
var id = deviceArray(m)
for (t <- 1 to timePatterns) { // there are 8 time patterns
var hrpvData = get24HoursPVF(dataDF, id, t).cache()
var hrpvDataZI = hrpvData.zipWithIndex
var clustersLSD = runKMeans(hrpvData, numClusters, numIterations)
var clusterPVPred = hrpvData.map(x => clustersLSD.predict(x))
var clusterPVMap = hrpvDataZI.zip(clusterPVPred)
var pvhgmRDD = clusterPVMap.map{r => (r._2, r._1._2)}.groupByKey
var arrHGinfo = pvhgmRDD.collect
// Post process data
// .....
hrpvData.unpersist()
}
}
Run Code Online (Sandbox Code Playgroud)
函数调用get24HoursPVF()
为 k-means 准备特征向量,大约需要 40 秒。每个循环大约需要 50 秒才能完成使用集群。我的数据大小为 2 到 3 GB(从表中读取)。给定 8000 个循环,运行这个 Spark 应用程序的总时间是不可接受的(8000x50s)。
既然每个设备都是独立的,那么有没有办法并行8000次迭代呢?或者如何利用集群来解决总运行时间长的问题?Scala Future 将无法工作,因为它只是几乎同时提交作业,但 Spark 不会同时运行这些作业。
除了 for 循环之外,您的代码中还有 2 个最慢的 Spark API 调用 -groupByKey
和collect
.
groupByKey 几乎不应该使用,而是查看reduceByKey
,查看此Databricks 博客了解更多详细信息。
collect
将该 RDD 中的所有数据传输到驱动程序节点上的数组,除非这是少量数据,否则会对性能产生相当大的影响。
在 for 循环中,我不是特别熟悉您要执行的操作,但是在
var hrpvData = get24HoursPVF(dataDF, id, t).cache()
Run Code Online (Sandbox Code Playgroud)
您正在为每个 id 和 t 值构建和缓存一个新的数据框。我不确定为什么您不能在开始时只构建一个包含 id 和 t 的每个变体的单个数据帧,然后在整个数据帧上运行 zipWithIndex、map 等?