sc.textfile与sc.wholeTextFiles + flatMapValues的内存使用情况

fem*_*yte 4 apache-spark

我有一组日志文件,我想读入RDD.这些日志文件都是压缩的gzip文件,文件名是日期戳.

我一直在sc.wholeTextFiles()阅读文件,似乎我一直在遇到Java堆内存问题.为了隔离问题,我决定在一台机器上针对单个文件运行它作为测试用例.

我从这里获得了这个文件:

http://dumps.wikimedia.org/other/pagecounts-raw/

以下是文件的大小,包括压缩和未压缩版本:

 myuser@fembuntu$ ls -ltr pagecounts-20090505-180000*
 -rw-rw-r-- 1 myuser myuser  65170192 Sep 20  2009 pagecounts-20090505-180000.gz
-rw-rw-r-- 1 myuser myuser 233007266 Jan 22 00:23 pagecounts-20090505-180000.txt
Run Code Online (Sandbox Code Playgroud)

并且机器上的可用内存如下:

myuser@fembuntu:~$ free -tm

       total       used       free     shared    buffers     cached
Mem:    4856       3018       1838        123         27        407
-/+ buffers/cache: 2583       2273
 Swap:  5080        849       4231
Total:  9937       3867       6069
Run Code Online (Sandbox Code Playgroud)

所以我启动了spark-shell,给执行程序2G内存:

$ spark-shell --executor-memory 2G

scala> val pc_loc = "file:///home/myuser/data/pagecounts"
scala> val filename="/pagecounts-20090505-180000.gz"
filename: String = /pagecounts-20090505-180000.gz
Run Code Online (Sandbox Code Playgroud)

在这里,我通过数据读取sc.textFile()并显示前2行:

scala>  var rdd=sc.textFile(pc_loc + filename)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:31

scala> rdd.take(2)
res0: Array[String] = Array(aa.b Help:Books 1 5263, aa.b Main_Page 1 5416)
Run Code Online (Sandbox Code Playgroud)

这很好.

在这里我使用sc.wholeTextFiles(),并在新行上拆分flatMapValues()以获得一对RDD,其中行是键值对.这些值对应于通过使用获得的RDD中的行sc.textFile().关键是文件路径.

scala> val pair_rdd=sc.wholeTextFiles(pc_loc + filename).flatMapValues(y => y.split("\n"))
pair_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at flatMapValues at <console>:31
Run Code Online (Sandbox Code Playgroud)

但是当我执行一个动作时出现堆错误:

scala> pair_rdd.take(2)
16/01/22 01:13:36 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapCharBuffer.<init>(HeapCharBuffer.java:57)
at java.nio.CharBuffer.allocate(CharBuffer.java:335)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:795)
at org.apache.hadoop.io.Text.decode(Text.java:412)
Run Code Online (Sandbox Code Playgroud)

谁能解释一下这里发生了什么?为什么flatMapValues分割行的调用似乎会破坏Java堆内存使用,从而导致堆错误?

zer*_*323 8

您遇到的问题并非特定于textFilevs wholeTextFileswith flatMapValuesscenario.看起来你的程序甚至没有达到数据被压平的程度,我很确定你在调用时会得到相同的异常count而不是mapValues.

实际上只是创建大型对象的问题.请记住,wholeTextFiles必须立即读取文件的完整内容,它不能部分溢出到磁盘或部分垃圾收集.虽然200MB左右不是特别令人印象深刻,但是单个对象处理的却相当多.此外,它必须驻留在一台机器上,这意味着分配负载更加困难.

与此不同wholeTextFiles,textFile在这种特殊情况下提供更高的粒度.单个对象必须处理的数据少得多,如果不再需要,可以有效地进行垃圾回收.

忽略对象的大小,看起来你在本地模式下使用Spark.这意味着一切都由一个JVM处理.由于堆由所有线程共享,这意味着可用于实际处理的内存量可能低于您的预期.

最后你应该记住,只有一部分可用内存是为堆保留的.请参阅垃圾收集器人机工程学以及如何确定默认的Java堆大小?.如果必须处理大型对象,则始终可以使用-Xms/ -XmxJava选项覆盖默认的初始和最大堆大小.