如何并行化Spark Scala计算?

use*_*047 6 scala apache-spark apache-spark-mllib

我有代码在集群之后计算平方误差的集合和,我主要从Spark mllib源代码中获取.

当我使用spark API运行类似代码时,它在许多不同的(分布式)作业中运行并成功运行.当我运行它我的代码(它应该与Spark代码做同样的事情)我得到一个堆栈溢出错误.有什么想法吗?

这是代码:

import java.util.Arrays
        import org.apache.spark.mllib.linalg.{Vectors, Vector}
        import org.apache.spark.mllib.linalg._
        import org.apache.spark.mllib.linalg.distributed.RowMatrix
        import org.apache.spark.rdd.RDD
        import org.apache.spark.api.java.JavaRDD
        import breeze.linalg.{axpy => brzAxpy, inv, svd => brzSvd, DenseMatrix => BDM, DenseVector => BDV,
          MatrixSingularException, SparseVector => BSV, CSCMatrix => BSM, Matrix => BM}

        val EPSILON = {
            var eps = 1.0
            while ((1.0 + (eps / 2.0)) != 1.0) {
              eps /= 2.0
            }
            eps
          }

        def dot(x: Vector, y: Vector): Double = {
            require(x.size == y.size,
              "BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes:" +
              " x.size = " + x.size + ", y.size = " + y.size)
            (x, y) match {
              case (dx: DenseVector, dy: DenseVector) =>
                dot(dx, dy)
              case (sx: SparseVector, dy: DenseVector) =>
                dot(sx, dy)
              case (dx: DenseVector, sy: SparseVector) =>
                dot(sy, dx)
              case (sx: SparseVector, sy: SparseVector) =>
                dot(sx, sy)
              case _ =>
                throw new IllegalArgumentException(s"dot doesn't support (${x.getClass}, ${y.getClass}).")
            }
         }

         def fastSquaredDistance(
              v1: Vector,
              norm1: Double,
              v2: Vector,
              norm2: Double,
              precision: Double = 1e-6): Double = {
            val n = v1.size
            require(v2.size == n)
            require(norm1 >= 0.0 && norm2 >= 0.0)
            val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
            val normDiff = norm1 - norm2
            var sqDist = 0.0
            /*
             * The relative error is
             * <pre>
             * EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),
             * </pre>
             * which is bounded by
             * <pre>
             * 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).
             * </pre>
             * The bound doesn't need the inner product, so we can use it as a sufficient condition to
             * check quickly whether the inner product approach is accurate.
             */
            val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
            if (precisionBound1 < precision) {
              sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)
            } else if (v1.isInstanceOf[SparseVector] || v2.isInstanceOf[SparseVector]) {
              val dotValue = dot(v1, v2)
              sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0)
              val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) /
                (sqDist + EPSILON)
              if (precisionBound2 > precision) {
                sqDist = Vectors.sqdist(v1, v2)
              }
            } else {
              sqDist = Vectors.sqdist(v1, v2)
            }
            sqDist
        }

        def findClosest(
              centers: TraversableOnce[Vector],
              point: Vector): (Int, Double) = {
            var bestDistance = Double.PositiveInfinity
            var bestIndex = 0
            var i = 0
            centers.foreach { center =>
              // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary
              // distance computation.
              var lowerBoundOfSqDist = Vectors.norm(center, 2.0) - Vectors.norm(point, 2.0)
              lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
              if (lowerBoundOfSqDist < bestDistance) {
                val distance: Double = fastSquaredDistance(center, Vectors.norm(center, 2.0), point, Vectors.norm(point, 2.0))
                if (distance < bestDistance) {
                  bestDistance = distance
                  bestIndex = i
                }
              }
              i += 1
            }
            (bestIndex, bestDistance)
        }

         def pointCost(
              centers: TraversableOnce[Vector],
              point: Vector): Double =
            findClosest(centers, point)._2



        def clusterCentersIter: Iterable[Vector] =
            clusterCenters.map(p => p)


        def computeCostZep(indata: RDD[Vector]): Double = {
            val bcCenters = indata.context.broadcast(clusterCenters)
            indata.map(p => pointCost(bcCenters.value, p)).sum()
          }

        computeCostZep(projectedData)
Run Code Online (Sandbox Code Playgroud)

我相信我使用所有相同的并行化工作作为spark,但它对我不起作用.让我的代码分发/帮助了解为什么在我的代码中发生内存溢出的任何建议都会非常有帮助

以下是spark中源代码的链接,它非常相似: KMeansModelKMeans

这是运行良好的代码:

val clusters = KMeans.train(projectedData, numClusters, numIterations)

val clusterCenters = clusters.clusterCenters




// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(projectedData)
println("Within Set Sum of Squared Errors = " + WSSSE)
Run Code Online (Sandbox Code Playgroud)

这是错误输出:

org.apache.spark.SparkException:作业因阶段失败而中止:阶段94.0中的任务1失败4次,最近失败:阶段94.0中丢失任务1.3(TID 37663,ip-172-31-13-209.ec2.内部):java.lang.StackOverflowError at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$$$$ c57ec8bf9b0d5f6161b97741d596ff0 $$ $$的wC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ iwC.dot(:226)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$$$$ c57ec8bf9b0d5f6161b97741d596ff0 $$$ $的wC $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ .dot(:226)......

然后下来:

驱动程序堆栈跟踪:在org.apache.spark.scheduler.DAGScheduler.org $阿帕奇$火花$ $调度$$ DAGScheduler failJobAndIndependentStages(DAGScheduler.scala:1431)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1申请(DAGScheduler.scala:1419)at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1418)at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala: 59)位于org.apache.spark.scheduler.DAGScheduler上的org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)中的scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) anonfun $ handleTaskSetFailed $ 1.适用(DAGScheduler.scala:799)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.适用(DAGScheduler.scala:799)在scala.Option.foreach(Option.scala:236 )org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.阶:1640)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)在org.apache.spark.util .EventLoop $$匿名$ 1.run(EventLoop.scala:48)在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)在org.apache.spark.SparkContext.runJob(SparkContext.scala:1832 )org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)atg.apache.spark.rdd.RDD $$ anonfun $ fold $ 1.apply(RDD.scala:1088)at org.apache.spark. rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150)在org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111)在org.apache.spark.rdd.RDD.withScope(RDD.scala: 316)org.apache.spark.rdd.RDD.fold(RDD.scala:1082)at org.apache.spark.rdd.DoubleRDDFunctions $$ anonfun $ sum $ 1.apply $ mcD $ sp(DoubleRDDFunctions.scala:34)在org.apache.spark.rdd.DoubleRDDFunctions $$ anonfun $ sum $ 1.apply(DoubleRDDFunctions.scala:34)at o 在org.apache.spark的org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150)的rg.apache.spark.rdd.DoubleRDDFunctions $$ anonfun $ sum $ 1.apply(DoubleRDDFunctions.scala:34) .rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111)atg.apache.spark.rdd.RDD.withScope(RDD.scala:316)org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala: 33)

jav*_*dba 4

发生的事情似乎非常简单:您正在递归调用dot此处的方法:

def dot(x: Vector, y: Vector): Double = {
        require(x.size == y.size,
          "BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes:" +
          " x.size = " + x.size + ", y.size = " + y.size)
        (x, y) match {
          case (dx: DenseVector, dy: DenseVector) =>
            dot(dx, dy)
          case (sx: SparseVector, dy: DenseVector) =>
            dot(sx, dy)
          case (dx: DenseVector, sy: SparseVector) =>
            dot(sy, dx)
          case (sx: SparseVector, sy: SparseVector) =>
            dot(sx, sy)
          case _ =>
            throw new IllegalArgumentException(s"dot doesn't support (${x.getClass}, ${y.getClass}).")
        }
     }
Run Code Online (Sandbox Code Playgroud)

随后的递归调用与前一个dot具有相同的参数 - 因此递归永远不会有结论。

堆栈跟踪也会告诉您这一点 - 请注意该位置位于方法处:

$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC 处的 java.lang.StackOverflowError $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC.dot(:226)在