Jac*_*ack 0 machine-learning apache-spark apache-spark-ml apache-spark-mllib
当我在大数据集上运行 Spark Kmeans 时,我总是遇到 OutOfMemory 错误。训练集大约 250GB,我有 10 个节点的 Spark 集群,每台机器有 16 个 CPU 和 150G 内存。我在每个节点上为作业分配了 100GB 内存,总共为 50 个 CPU。我将聚类中心设置为 100,迭代次数为 5。但是当代码在以下行上运行时,我得到了 OutOfMemory:
val model = KMeans.train(parsedData, numClusters, numIterations)
Run Code Online (Sandbox Code Playgroud)
有没有我可以调整的参数来解决问题。
如果我设置较小的聚类中心数或迭代数就可以了。
我的代码如下:
val originalData = sc.textFile("hdfs://host/input.txt").cache()
val tupleData = originalData.map { x => (x.split(":")(0),x.split(":")(1)) }
val parsedData = tupleData.map { x => x._1 }.map(s => Vectors.dense(s.split(',').map(_.toDouble)))
val model = KMeans.train(parsedData, numClusters, numIterations, 1, initializationMode = KMeans.RANDOM)
val resultRdd = tupleData.map { p => (model.predict(Vectors.dense(p._1.split(',').map(_.toDouble))),p._2)}
resultRdd.sortByKey(true, 1).saveAsTextFile("hdfs://host/output.txt")
Run Code Online (Sandbox Code Playgroud)
我的输入格式如下:
0.0,0.0,91.8,21.67,0.0 ... (the element number is 100K)
1.1,1.08,19.8,0.0,0.0 ...
0.0,0.08,19.8,0.0,0.0 ...
...
The rows number is 600K.
Run Code Online (Sandbox Code Playgroud)
我得到的异常如下:
scheduler.DAGScheduler: Submitting ShuffleMapStage 42 (MapPartitionsRDD[49] at map at KmeansTest.scala:47), which has no missing parents
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
Run Code Online (Sandbox Code Playgroud)
By default Spark's Kmeans implementation use K_MEANS_PARALLEL initialization mode. Part of this mode run on the driver machine and can be extremely slow / cause OOM on the driver, depending on your data.
Try switching to RANDOM initialization mode.
val model = KMeans.train(parsedData, numClusters, numIterations, 1, initializationMode = KMeans.RANDOM)
Run Code Online (Sandbox Code Playgroud)
Another thing to try is to increase your driver memory when you submit your application. For example, use the following command to set driver memory to 4G
spark-submit --conf "spark.driver.memory=4g" ...
Run Code Online (Sandbox Code Playgroud)