无法使用 Apache Spark 读取 AWS Glue 中的 json 文件

Rud*_*ven 5 scala amazon-s3 apache-spark aws-glue

对于我们的用例,我们需要从 S3 存储桶加载 json 文件。我们使用 AWS Glue 作为处理工具。但由于我们很快就会迁移到 Amazon EMR,因此我们已经在开发仅使用 Spark 功能的 Glue 作业。这样以后迁移会更容易。这意味着对于我们的用例,我们不能使用任何 Glue 功能,例如对输入文件进行分组

我们面临的问题是,当我们读取这些 JSON 文件时,我们发现驱动程序的内存将达到 100%,直到最终作业因 OOM 异常而失败。

--conf spark.driver.memory=20g我们已经尝试通过使用 G.2X 实例并向Glue 作业添加参数来最大化驱动程序内存。

我们使用的代码很简单:

spark.read.option("inferSchema", value = true).json("s3://bucket_with_json/sub_folder")
Run Code Online (Sandbox Code Playgroud)

输入数据是21个json文件,大小为100MB。文件本身不是有效的 json 对象,但每个文件包含多个 json 对象。例如:

{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
}
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}
Run Code Online (Sandbox Code Playgroud)

(不是真实的数据集)

我们目前使用的胶水作业规格:

  • 工人类型:G.2X
  • 工人人数:20人
  • 其他 Spark 参数:'--conf': 'spark.driver.maxResultSize=2g --conf spark.yarn.executor.memory=7g --conf spark.driver.memory=20g'
  • 工作语言:scala
  • 胶水版本:3.0

此图显示了驱动程序的内存与执行程序的内存如何超过最大值:

在此输入图像描述

+- 10 分钟后我们得到的错误是:

由于内存不足,命令失败

java.lang.OutOfMemoryError: Java 堆空间 -XX:OnOutOfMemoryError="kill -9 %p" 正在执行 /bin/sh -c "kill -9 8"...

另外值得注意的是,当我们运行较小的数据集时,一切正常。

此时我已经没有选择了。有人可以帮助我解决这个问题或为我指明正确的方向吗?另外,如果有人能解释为什么我的司机被淹了。我一直以为json文件是由执行者读取的。读入数据后,我没有向驱动程序收集任何数据,所以我无法解释为什么会发生这种情况。

** 编辑 **

我尝试将输入文件转换为一个有效的 json。因此转换为格式:

[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]
Run Code Online (Sandbox Code Playgroud)

并使用了选项:

.option("multiline", "true")
Run Code Online (Sandbox Code Playgroud)

但不幸的是这给了我同样的结果/错误..

**编辑**

我想补充一点,上面的数据示例及其结构与我正在使用的数据不同。向您提供一些有关我的数据的信息:

该结构非常嵌套。它包含 25 个顶级字段。其中7个是嵌套的。如果将所有内容展平,最终会得到 +- 200 个字段。该选项可能inferSchema是我的问题的原因。

kmh*_*kmh 6

我认为设置inferSchema == true可能是问题所在,因为这是在驱动程序中执行的。那么为什么要在读取时推断模式(需要额外传递数据,需要更多驱动程序资源)?也许这个玩具示例中丢失了原因,但也许您可以尝试这个?

首先...你的第二种文件格式工作得很好(第一个没有)...我创建了一些像这样的文件并将它们全部粘贴在 S3 上的一个文件夹中。

[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]
Run Code Online (Sandbox Code Playgroud)

我尝试的一种替代方法是在您阅读时自己提供架构。

import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType }

val schema = {
    new StructType()
      .add(StructField("RecordNumber", IntegerType, false))
      .add(StructField("Zipcode", IntegerType, true))
      .add(StructField("ZipCodeType", StringType, true))
      .add(StructField("City", StringType, true))
      .add(StructField("State", StringType, true))
  }

val df = spark.read.option("multiline", value = true).schema(schema).json("s3://bucket/path/")
Run Code Online (Sandbox Code Playgroud)

另一件要尝试的事情......只是在阅读时跳过推断模式。我不知道以下内容是否以相同的方式使用驱动程序资源,但我似乎记得它可能使用一小部分行。

val df = spark.read.option("multiline", value = true).json("s3://bucket/path/")

val schema = df.schema
schema.printTreeString

df.printSchema
Run Code Online (Sandbox Code Playgroud)

编辑- 回应表明上述不好的评论

最后一件事要尝试...在这里,我只是想让驱动程序脱离混合状态,所以我正在执行以下操作...

  1. 以纯文本形式读取,并包含多行 JSON 记录
  2. 用于.mapPartitions迭代每个分区并将分成多行的 JSON 合并为每个 JSON 字符串 1 条记录
  3. 最后...使用您最喜欢的解析器解析为 JSON(我使用 json4s 没有特殊原因)

如果在此之后,您仍然遇到内存错误,则应该是在执行器上,您有更多的选择。

当然,如果您希望 Spark 自动将其读入 200 列数据帧,也许您只需要一个更大的驱动程序。

因此,这里是迭代文本行并尝试合并为每行上的单个记录的函数。这适用于玩具示例,但您可能需要做一些更聪明的事情。

.mapPartitions将每个分区视为一个迭代器...因此您需要为其提供一个类型的函数Iterator[A] => Iterator[B],在本例中该函数只是.foldLeft使用正则表达式来确定它是否是记录的末尾。

import org.apache.spark.rdd.RDD // RDD because that's what I use; probably similar on dataframes
import org.json4s._             // json4s for no particular reason
import org.json4s.jackson.JsonMethods._

/** `RDD.mapPartitions` treats each partition as an iterator
  * so use a .foldLeft on the partition and a little regex
  * merge multiple lines into one
  * 
  * probably need something a smarter for more nested JSON
  */
val mergeJsonRecords: (Iterator[String] => Iterator[String]) = (oneRawPartition) => {
  // val patternStart = "^\\[?\\{".r
  val patternEnd = "(.*?\\})[,\\]]?$".r // end of JSON record
  oneRawPartition
    .foldLeft(List[String]())((list, next) => list match {
      case Nil => List(next.trim.drop(1))
      case x :: Nil => { 
        x.trim match {
          case patternEnd(e) => List(next.trim, e)
          case _ => List(x + next.trim)
        }
      }
      case x :: xs => {
        x.trim match {
          case patternEnd(e) => next.trim :: e :: xs
          case _ => x.trim + next.trim :: xs
        }
      }
    })
    .map { case patternEnd(e) => e; case x => x } // lame way to clean up last JSON in each partitions
    .iterator
}
Run Code Online (Sandbox Code Playgroud)

这里只是读取数据...合并行...然后解析。再说一次,我使用 RDD 因为这是我通常使用的,但我确信如果需要,您可以将其保存在数据框中。

// read JSON in as plain text; each JSON record over multiple lines
val rdd: RDD[String] = spark.read.text("s3://bucket/path/").rdd.map(_.getAs[String](0))
rdd.count // 56 rows == 8 records

// one record per JSON object
val rdd2: RDD[String] = rdd.mapPartitions(mergeJsonRecords)
rdd2.collect.foreach(println)
rdd2.count // 8

// parsed JSON
object Parser extends Serializable {
  implicit val formats = DefaultFormats
  val func: (String => JValue) = (s) => parse(s)
}
val rddJson = rdd2.map(Parser.func)
Run Code Online (Sandbox Code Playgroud)