我有一组日志文件,我想读入RDD.这些日志文件都是压缩的gzip文件,文件名是日期戳.
我一直在sc.wholeTextFiles()阅读文件,似乎我一直在遇到Java堆内存问题.为了隔离问题,我决定在一台机器上针对单个文件运行它作为测试用例.
我从这里获得了这个文件:
http://dumps.wikimedia.org/other/pagecounts-raw/
以下是文件的大小,包括压缩和未压缩版本:
myuser@fembuntu$ ls -ltr pagecounts-20090505-180000*
-rw-rw-r-- 1 myuser myuser 65170192 Sep 20 2009 pagecounts-20090505-180000.gz
-rw-rw-r-- 1 myuser myuser 233007266 Jan 22 00:23 pagecounts-20090505-180000.txt
Run Code Online (Sandbox Code Playgroud)
并且机器上的可用内存如下:
myuser@fembuntu:~$ free -tm
total used free shared buffers cached
Mem: 4856 3018 1838 123 27 407
-/+ buffers/cache: 2583 2273
Swap: 5080 849 4231
Total: 9937 3867 6069
Run Code Online (Sandbox Code Playgroud)
所以我启动了spark-shell,给执行程序2G内存:
$ spark-shell --executor-memory 2G
scala> val pc_loc = "file:///home/myuser/data/pagecounts"
scala> val filename="/pagecounts-20090505-180000.gz"
filename: String = /pagecounts-20090505-180000.gz
Run Code Online (Sandbox Code Playgroud)
在这里,我通过数据读取sc.textFile()并显示前2行:
scala> var rdd=sc.textFile(pc_loc + filename)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:31
scala> rdd.take(2)
res0: Array[String] = Array(aa.b Help:Books 1 5263, aa.b Main_Page 1 5416)
Run Code Online (Sandbox Code Playgroud)
这很好.
在这里我使用sc.wholeTextFiles(),并在新行上拆分flatMapValues()以获得一对RDD,其中行是键值对.这些值对应于通过使用获得的RDD中的行sc.textFile().关键是文件路径.
scala> val pair_rdd=sc.wholeTextFiles(pc_loc + filename).flatMapValues(y => y.split("\n"))
pair_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at flatMapValues at <console>:31
Run Code Online (Sandbox Code Playgroud)
但是当我执行一个动作时出现堆错误:
scala> pair_rdd.take(2)
16/01/22 01:13:36 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapCharBuffer.<init>(HeapCharBuffer.java:57)
at java.nio.CharBuffer.allocate(CharBuffer.java:335)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:795)
at org.apache.hadoop.io.Text.decode(Text.java:412)
Run Code Online (Sandbox Code Playgroud)
谁能解释一下这里发生了什么?为什么flatMapValues分割行的调用似乎会破坏Java堆内存使用,从而导致堆错误?
您遇到的问题并非特定于textFilevs wholeTextFileswith flatMapValuesscenario.看起来你的程序甚至没有达到数据被压平的程度,我很确定你在调用时会得到相同的异常count而不是mapValues.
实际上只是创建大型对象的问题.请记住,wholeTextFiles必须立即读取文件的完整内容,它不能部分溢出到磁盘或部分垃圾收集.虽然200MB左右不是特别令人印象深刻,但是单个对象处理的却相当多.此外,它必须驻留在一台机器上,这意味着分配负载更加困难.
与此不同wholeTextFiles,textFile在这种特殊情况下提供更高的粒度.单个对象必须处理的数据少得多,如果不再需要,可以有效地进行垃圾回收.
忽略对象的大小,看起来你在本地模式下使用Spark.这意味着一切都由一个JVM处理.由于堆由所有线程共享,这意味着可用于实际处理的内存量可能低于您的预期.
最后你应该记住,只有一部分可用内存是为堆保留的.请参阅垃圾收集器人机工程学以及如何确定默认的Java堆大小?.如果必须处理大型对象,则始终可以使用-Xms/ -XmxJava选项覆盖默认的初始和最大堆大小.
| 归档时间: |
|
| 查看次数: |
4454 次 |
| 最近记录: |