Rud*_*ven 5 scala amazon-s3 apache-spark aws-glue
对于我们的用例,我们需要从 S3 存储桶加载 json 文件。我们使用 AWS Glue 作为处理工具。但由于我们很快就会迁移到 Amazon EMR,因此我们已经在开发仅使用 Spark 功能的 Glue 作业。这样以后迁移会更容易。这意味着对于我们的用例,我们不能使用任何 Glue 功能,例如对输入文件进行分组。
我们面临的问题是,当我们读取这些 JSON 文件时,我们发现驱动程序的内存将达到 100%,直到最终作业因 OOM 异常而失败。
--conf spark.driver.memory=20g我们已经尝试通过使用 G.2X 实例并向Glue 作业添加参数来最大化驱动程序内存。
我们使用的代码很简单:
spark.read.option("inferSchema", value = true).json("s3://bucket_with_json/sub_folder")
Run Code Online (Sandbox Code Playgroud)
输入数据是21个json文件,大小为100MB。文件本身不是有效的 json 对象,但每个文件包含多个 json 对象。例如:
{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
}
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}
Run Code Online (Sandbox Code Playgroud)
(不是真实的数据集)
我们目前使用的胶水作业规格:
'--conf': 'spark.driver.maxResultSize=2g --conf spark.yarn.executor.memory=7g --conf spark.driver.memory=20g'此图显示了驱动程序的内存与执行程序的内存如何超过最大值:
+- 10 分钟后我们得到的错误是:
由于内存不足,命令失败
java.lang.OutOfMemoryError: Java 堆空间 -XX:OnOutOfMemoryError="kill -9 %p" 正在执行 /bin/sh -c "kill -9 8"...
另外值得注意的是,当我们运行较小的数据集时,一切正常。
此时我已经没有选择了。有人可以帮助我解决这个问题或为我指明正确的方向吗?另外,如果有人能解释为什么我的司机被淹了。我一直以为json文件是由执行者读取的。读入数据后,我没有向驱动程序收集任何数据,所以我无法解释为什么会发生这种情况。
** 编辑 **
我尝试将输入文件转换为一个有效的 json。因此转换为格式:
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
Run Code Online (Sandbox Code Playgroud)
并使用了选项:
.option("multiline", "true")
Run Code Online (Sandbox Code Playgroud)
但不幸的是这给了我同样的结果/错误..
**编辑**
我想补充一点,上面的数据示例及其结构与我正在使用的数据不同。向您提供一些有关我的数据的信息:
该结构非常嵌套。它包含 25 个顶级字段。其中7个是嵌套的。如果将所有内容展平,最终会得到 +- 200 个字段。该选项可能inferSchema是我的问题的原因。
我认为设置inferSchema == true可能是问题所在,因为这是在驱动程序中执行的。那么为什么要在读取时推断模式(需要额外传递数据,需要更多驱动程序资源)?也许这个玩具示例中丢失了原因,但也许您可以尝试这个?
首先...你的第二种文件格式工作得很好(第一个没有)...我创建了一些像这样的文件并将它们全部粘贴在 S3 上的一个文件夹中。
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
Run Code Online (Sandbox Code Playgroud)
我尝试的一种替代方法是在您阅读时自己提供架构。
import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType }
val schema = {
new StructType()
.add(StructField("RecordNumber", IntegerType, false))
.add(StructField("Zipcode", IntegerType, true))
.add(StructField("ZipCodeType", StringType, true))
.add(StructField("City", StringType, true))
.add(StructField("State", StringType, true))
}
val df = spark.read.option("multiline", value = true).schema(schema).json("s3://bucket/path/")
Run Code Online (Sandbox Code Playgroud)
另一件要尝试的事情......只是在阅读时跳过推断模式。我不知道以下内容是否以相同的方式使用驱动程序资源,但我似乎记得它可能使用一小部分行。
val df = spark.read.option("multiline", value = true).json("s3://bucket/path/")
val schema = df.schema
schema.printTreeString
df.printSchema
Run Code Online (Sandbox Code Playgroud)
编辑- 回应表明上述不好的评论
最后一件事要尝试...在这里,我只是想让驱动程序脱离混合状态,所以我正在执行以下操作...
.mapPartitions迭代每个分区并将分成多行的 JSON 合并为每个 JSON 字符串 1 条记录如果在此之后,您仍然遇到内存错误,则应该是在执行器上,您有更多的选择。
当然,如果您希望 Spark 自动将其读入 200 列数据帧,也许您只需要一个更大的驱动程序。
因此,这里是迭代文本行并尝试合并为每行上的单个记录的函数。这适用于玩具示例,但您可能需要做一些更聪明的事情。
.mapPartitions将每个分区视为一个迭代器...因此您需要为其提供一个类型的函数Iterator[A] => Iterator[B],在本例中该函数只是.foldLeft使用正则表达式来确定它是否是记录的末尾。
import org.apache.spark.rdd.RDD // RDD because that's what I use; probably similar on dataframes
import org.json4s._ // json4s for no particular reason
import org.json4s.jackson.JsonMethods._
/** `RDD.mapPartitions` treats each partition as an iterator
* so use a .foldLeft on the partition and a little regex
* merge multiple lines into one
*
* probably need something a smarter for more nested JSON
*/
val mergeJsonRecords: (Iterator[String] => Iterator[String]) = (oneRawPartition) => {
// val patternStart = "^\\[?\\{".r
val patternEnd = "(.*?\\})[,\\]]?$".r // end of JSON record
oneRawPartition
.foldLeft(List[String]())((list, next) => list match {
case Nil => List(next.trim.drop(1))
case x :: Nil => {
x.trim match {
case patternEnd(e) => List(next.trim, e)
case _ => List(x + next.trim)
}
}
case x :: xs => {
x.trim match {
case patternEnd(e) => next.trim :: e :: xs
case _ => x.trim + next.trim :: xs
}
}
})
.map { case patternEnd(e) => e; case x => x } // lame way to clean up last JSON in each partitions
.iterator
}
Run Code Online (Sandbox Code Playgroud)
这里只是读取数据...合并行...然后解析。再说一次,我使用 RDD 因为这是我通常使用的,但我确信如果需要,您可以将其保存在数据框中。
// read JSON in as plain text; each JSON record over multiple lines
val rdd: RDD[String] = spark.read.text("s3://bucket/path/").rdd.map(_.getAs[String](0))
rdd.count // 56 rows == 8 records
// one record per JSON object
val rdd2: RDD[String] = rdd.mapPartitions(mergeJsonRecords)
rdd2.collect.foreach(println)
rdd2.count // 8
// parsed JSON
object Parser extends Serializable {
implicit val formats = DefaultFormats
val func: (String => JValue) = (s) => parse(s)
}
val rddJson = rdd2.map(Parser.func)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1260 次 |
| 最近记录: |