我的集群:1个主服务器,11个从服务器,每个节点有6 GB内存.
我的设置:
spark.executor.memory=4g, Dspark.akka.frameSize=512
Run Code Online (Sandbox Code Playgroud)
这是问题所在:
首先,我从HDFS到RDD读取了一些数据(2.19 GB):
val imageBundleRDD = sc.newAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)
其次,在这个RDD上做点什么:
val res = imageBundleRDD.map(data => {
val desPoints = threeDReconstruction(data._2, bg)
(data._1, desPoints)
})
Run Code Online (Sandbox Code Playgroud)
最后,输出到HDFS:
res.saveAsNewAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)
当我运行我的程序时,它显示:
.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO …Run Code Online (Sandbox Code Playgroud) 我试图使用本指南在EC2上使用Spark主机对常见爬网数据进行简单转换,我的代码如下所示:
package ccminer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object ccminer {
val english = "english|en|eng"
val spanish = "es|esp|spa|spanish|espanol"
val turkish = "turkish|tr|tur|turc"
val greek = "greek|el|ell"
val italian = "italian|it|ita|italien"
val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")
def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")
def main(args: Array[String]): Unit = {
if (args.length != 3) {
System.err.println("Bad command line")
System.exit(-1)
}
val cluster = "spark://???"
val sc = new SparkContext(cluster, "Common Crawl Miner", …Run Code Online (Sandbox Code Playgroud) 我有一个包含150 G txt文件的文件夹(大约700个文件,平均每个200 MB).
我正在使用scala处理文件并最终计算一些聚合统计信息.我看到两种可行的方法:
我倾向于第二种方法,因为它看起来更干净(不需要特定于并行化的代码),但我想知道我的方案是否适合我的硬件和数据所施加的限制.我有一个工作站,有16个线程和64 GB RAM可用(因此并行化将严格地在不同处理器核心之间本地化).我可能会在以后使用更多计算机扩展基础架构,但是现在我只想专注于调整这一个工作站场景的设置.
我正在使用的代码: - 读取TSV文件,并将有意义的数据提取到(String,String,String)三元组 - 然后执行一些过滤,映射和分组 - 最后,减少数据并计算一些聚合
我已经能够用一个单一的文件(〜200 MB的数据)来运行该代码,但是我收到java.lang.OutOfMemoryError:GC开销超过限制和/或Java进行添加更多的数据时,堆异常(在应用程序中断了6GB的数据,但我想将它与150 GB的数据一起使用).
我想我必须调整一些参数才能使其工作.我将不胜感激任何有关如何解决此问题的提示(如何调试内存需求).我已经尝试增加'spark.executor.memory'并使用较少数量的内核(理性的是每个内核需要一些堆空间),但这并没有解决我的问题.
我不需要解决方案非常快(如果需要,它可以轻松运行几个小时甚至几天).我也没有缓存任何数据,但最后只是将它们保存到文件系统中.如果您认为使用手动并行化方法更可行,我也可以这样做.