处理记录大小超过3GB的火花

DIN*_*GAN 5 hadoop memory-management scala apache-spark spark-dataframe

当单个记录大小超过3GB时,我会遇到异常

java.lang.IllegalArgumentException
App > at java.nio.CharBuffer.allocate(CharBuffer.java:330)
App > at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:792)
App > at org.apache.hadoop.io.Text.decode(Text.java:412)
App > at org.apache.hadoop.io.Text.decode(Text.java:389)
App > at org.apache.hadoop.io.Text.toString(Text.java:280)
App > at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$createBaseRdd$1.apply(JsonFileFormat.scala:135)
App > at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$createBaseRdd$1.apply(JsonFileFormat.scala:135)
Run Code Online (Sandbox Code Playgroud)

如何增加单个记录的缓冲区大小?

Oli*_*Oli 0

您的文件中可能有一大行包含该数组。这里你会得到一个异常,因为你试图构建一个太大的 CharBuffer(很可能是一个超出界限后变成负数的整数)。java 中的最大数组/字符串大小为 2^31-1 (Integer.MAX_VALUE -1)(请参阅此线程)。您说您有一条 3GB 的记录,每个字符 1B,即 30 亿个字符,超过 2^31,大约等于 20 亿。

你可以做的有点hacky,但由于你只有一把钥匙和一个大数组,所以它可能会起作用。您的 json 文件可能如下所示:

{
  "key" : ["v0", "v1", "v2"... ]
}
Run Code Online (Sandbox Code Playgroud)

或像这样,但我认为在你的情况下是前者:

{
  "key" : [
      "v0", 
      "v1", 
      "v2",
      ... 
   ]
}
Run Code Online (Sandbox Code Playgroud)

因此,您可以尝试将 hadoop 使用的行分隔符更改为“,”,如下所示。基本上,他们是这样做的:

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

def nlFile(path: String) = {
    val conf = new Configuration
    conf.set("textinputformat.record.delimiter", ",")
    sc.newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
          .map(_._2.toString)
}
Run Code Online (Sandbox Code Playgroud)

然后你可以读取你的数组,只需要自己删除 JSON 括号,如下所示:

nlFile("...")
  .map(_.replaceAll("^.*\\[", "").replaceAll("\\].*$",""))
Run Code Online (Sandbox Code Playgroud)

请注意,如果您的记录可以包含字符“[”和“]”,则您必须更加小心,但这是想法。