Apache Spark中的大型RDD [MatrixEntry]超出了GC开销限制

Aks*_*ngh 2 garbage-collection scala apache-spark

我有一个csv文件存储了维度的用户项数据6,365x214,我通过使用columnSimilarities()来找到用户 - 用户的相似性org.apache.spark.mllib.linalg.distributed.CoordinateMatrix.

我的代码看起来像这样:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.{RowMatrix, 
MatrixEntry, CoordinateMatrix}
import org.apache.spark.rdd.RDD

def rddToCoordinateMatrix(input_rdd: RDD[String]) : CoordinateMatrix = {

    // Convert RDD[String] to RDD[Tuple3]
    val coo_matrix_input: RDD[Tuple3[Long,Long,Double]] = input_rdd.map(
        line => line.split(',').toList
    ).map{
            e => (e(0).toLong, e(1).toLong, e(2).toDouble)
    }

    // Convert RDD[Tuple3] to RDD[MatrixEntry]
    val coo_matrix_matrixEntry: RDD[MatrixEntry] = coo_matrix_input.map(e => MatrixEntry(e._1, e._2, e._3))

    // Convert RDD[MatrixEntry] to CoordinateMatrix
    val coo_matrix: CoordinateMatrix  = new CoordinateMatrix(coo_matrix_matrixEntry)

    return coo_matrix
}

// Read CSV File to RDD[String]
val input_rdd: RDD[String] = sc.textFile("user_item.csv")

// Read RDD[String] to CoordinateMatrix
val coo_matrix = rddToCoordinateMatrix(input_rdd)

// Transpose CoordinateMatrix
val coo_matrix_trans = coo_matrix.transpose()

// Convert CoordinateMatrix to RowMatrix
val mat: RowMatrix = coo_matrix_trans.toRowMatrix()

// Compute similar columns perfectly, with brute force
// Return CoordinateMatrix
val simsPerfect: CoordinateMatrix = mat.columnSimilarities()

// CoordinateMatrix to RDD[MatrixEntry]
val simsPerfect_entries = simsPerfect.entries

simsPerfect_entries.count()

// Write results to file
val results_rdd = simsPerfect_entries.map(line => line.i+","+line.j+","+line.value)

results_rdd.saveAsTextFile("similarity-output")

// Close the REPL terminal
System.exit(0)
Run Code Online (Sandbox Code Playgroud)

并且,当我在spark-shell上运行此脚本时 ,运行代码行后出现以下错误simsPerfect_entries.count():

java.lang.OutOfMemoryError: GC overhead limit exceeded
Run Code Online (Sandbox Code Playgroud)

更新:

我尝试过其他人已经提供的许多解决方案,但我没有成功.

1通过增加每个执行程序进程使用的内存量 spark.executor.memory=1g

2通过减少用于驱动程序进程的核心数量 spark.driver.cores=1

建议我解决这个问题的方法.

shu*_*tty 6

所有Spark转换都是惰性的,直到您实际实现它为止.当您定义RDD到RDD数据操作时,Spark只是将操作链接在一起,而不是执行实际计算.因此,当您致电时simsPerfect_entries.count(),执行操作链并获得您的号码.

错误GC overhead limit exceeded意味着JVM垃圾收集器活动太高,以至于代码执行停止.由于以下原因,GC活动可能很高:

  • 您生成太多小物件并立即丢弃它们.看起来你不是.
  • 您的数据不适合您的JVM堆.就像你试图将2GB文本文件加载到RAM中一样,但只有1GB的JVM堆.看起来这是你的情况.

要解决此问题,请尝试增加JVM堆的数量:

  • 如果您有分布式Spark设置,则为您的工作节点.
  • 你的火花壳应用程序.

  • 感谢您的回答.我通过在spark-shell中添加一个额外的标志来增加`driver-memory`来解决我的问题,默认情况下它是**1g**.所以.我把它增加到**4g**.*$ spark-shell - driver-memory 4g* (2认同)