Dyn*_*ite 5 performance scala hadoop-yarn apache-spark
我遇到了Spark应用程序问题。这是我的代码的简化版本:
def main(args: Array[String]) {
// Initializing spark context
val sc = new SparkContext()
val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)
// Getting files from TGZ archives
val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
logger.debug("Getting files from archive : "+tgzStream._1)
utils.getFilesFromTgzStream(tgzStream._2)
})
// We run the same process with 3 different "modes"
val modes = Seq("mode1", "mode2", "mode3")
// We cache the RDD before
val nb = filesRDD.cache().count()
logger.debug($nb + " files as input")
modes.map(mode => {
logger.debug("Processing files with mode : " + mode)
myProcessor.process(mode, filesRDD)
})
filesRDD.unpersist() // I tried with or without this
[...]
}
Run Code Online (Sandbox Code Playgroud)
生成的日志是(例如,输入3个档案):
从存档中获取文件:
从存档中获取文件:b
从存档中获取文件:c
3个文件作为输入
处理模式为mode1的文件
从存档中获取文件:
从存档中获取文件:b
从存档中获取文件:c
处理模式为mode2的文件
从存档中获取文件:
从存档中获取文件:b
从存档中获取文件:c
处理模式为mode3的文件
从存档中获取文件:
从存档中获取文件:b
从存档中获取文件:c
我的Spark配置:
What i understand from these logs is that the files extraction is performed 4 times instread of one ! This obviously leads me to Heap Space issues and performance leaks...
Am I doing something wrong ?
EDIT : I also tried to use modes.foreach(...) instead of map but nothing changed...
好的,经过大量测试,我终于解决了这个问题。事实上有2个问题:
\n\n我低估了输入数据的大小: Sparkcache或persist如果 RDD 太大而无法完全存储在总内存的 60% 中,函数就会效率低下,我知道这一点,但认为我的输入数据不是那么大,但在事实上我的 RDD 是 80GB。但我的 60% 内存(即 160GB)仍然超过 80GB,那么发生了什么?回答问题 n\xc2\xb02...
我的分区太大:在我的代码中,我的 RDD 的分区数量设置为 100,所以我有 100 个分区,每个分区 1.6GB。问题是我的数据由几十兆字节的字符串组成,所以我的分区没有满,10GB 的已用内存实际上只包含 7 或 8GB 的真实数据。
为了解决这些问题,我不得不使用persist(StorageLevel.MEMORY_SER)它增加计算时间但显着减少内存使用(根据这个基准)并将分区数设置为 1000(根据 Spark 文档,建议分区为 ~128MB)