Spark Caching:RDD只缓存了8%

Moh*_*itt 11 memory-management scala distributed-computing apache-spark rdd

对于我的代码段如下:

val levelsFile = sc.textFile(levelsFilePath)
val levelsSplitedFile = levelsFile.map(line => line.split(fileDelimiter, -1))
val levelPairRddtemp = levelsSplitedFile
                              .filter(linearr => ( linearr(pogIndex).length!=0))
                              .map(linearr => (linearr(pogIndex).toLong, levelsIndexes.map(x => linearr(x))
                              .filter(value => (!value.equalsIgnoreCase("") && !value.equalsIgnoreCase(" ") && !value.equalsIgnoreCase("null")))))
                              .mapValues(value => value.mkString(","))
                              .partitionBy(new HashPartitioner(24))
                              .persist(StorageLevel.MEMORY_ONLY_SER)

levelPairRddtemp.count // just to trigger rdd creation
Run Code Online (Sandbox Code Playgroud)

信息

  1. 文件的大小是~4G
  2. 我使用2 executors(每个5G)和12个核心.
  3. Spark 版本:1.5.2

问题

当我看到它SparkUIStorage tab,我看到的是:

在此输入图像描述

在里面RDD看来,24个partitions中只有2个被缓存.

在此输入图像描述

对此行为的任何解释,以及如何解决此问题.

编辑1:我刚尝试使用60个分区HashPartitioner作为:

..
.partitionBy(new HashPartitioner(60))
..
Run Code Online (Sandbox Code Playgroud)

工作了.现在我得到了整个RDD缓存.有什么猜测这里可能发生了什么?数据偏差是否会导致此行为?

编辑-2:包含BlockManagerInfo我再次使用24运行的日志partitions.这次3/24 partitions被缓存:

16/03/17 14:15:28 INFO BlockManagerInfo: Added rdd_294_14 in memory on ip-10-1-34-66.ec2.internal:47526 (size: 107.3 MB, free: 2.6 GB) 
16/03/17 14:15:30 INFO BlockManagerInfo: Added rdd_294_17 in memory on ip-10-1-34-65.ec2.internal:57300 (size: 107.3 MB, free: 2.6 GB) 
16/03/17 14:15:30 INFO BlockManagerInfo: Added rdd_294_21 in memory on ip-10-1-34-65.ec2.internal:57300 (size: 107.4 MB, free: 2.5 GB)
Run Code Online (Sandbox Code Playgroud)

gsa*_*ras 1

我相信发生这种情况是因为达到了内存限制,或者更重要的是,您使用的内存选项不允许您的工作利用所有资源。

增加 #partitions 意味着减少每个任务的大小,这可能解释了这种行为。