将大量JSON文件读入Spark Dataframe

Ani*_*pic 5 json scala dataframe apache-spark

我有一个大的嵌套NDJ(新行分隔的JSON)文件,我需要读入一个火花数据帧并保存到镶木地板.在尝试呈现模式时,我使用此函数:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
        schema.fields.flatMap(f => {
          val colName = if (prefix == null) f.name else (prefix + "." + f.name)
          f.dataType match {
            case st: StructType => flattenSchema(st, colName)
            case _ => Array(col(colName))
          }
        })
  }
Run Code Online (Sandbox Code Playgroud)

在通过读取返回的数据帧上

val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)

我也把它切换到val df = spark.read.json(path)这样,这只适用于NDJ而不是多行JSON - 同样的错误.

这导致工人的内存不足错误 java.lang.OutOfMemoryError: Java heap space.

我已经改变了jvm内存选项和spark执行器/驱动程序选项无济于事

有没有办法流式传输文件,展平架构,并逐步添加到数据框?JSON的某些行包含前面提到的新字段...因此需要稍后填写.

Ram*_*mzy 0

您可以通过多种方式实现这一目标。

首先,在读取时,您可以提供数据帧的架构来读取 json,或者您可以允许 Spark 自行推断架构。

一旦 json 进入 dataframe,您就可以按照以下方法对其进行扁平化。

A。在数据帧上使用explode() - 将其展平。b. 使用 Spark sql 并使用 访问嵌套字段。操作员。您可以在这里找到示例

最后,如果您想向数据框添加新列a。第一种选择,使用 withColumn() 是一种方法。但是,这将为添加的每个新列和整个数据集完成。b. 使用 sql 从现有的数据帧生成新的数据帧 - 这可能是最简单的 c。最后,使用映射,然后访问元素,获取旧模式,添加新值,创建新模式,最后获取新的 df - 如下所示

一个 withColumn 将在整个 rdd 上工作。因此,通常为要添加的每个列使用该方法并不是一个好习惯。有一种方法可以在地图函数中处理列及其数据。由于一个映射函数在这里完成这项工作,因此添加新列及其数据的代码将并行完成。

A。您可以根据计算收集新值

b. 将这些新列值添加到主 rdd 中,如下所示

val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)
Run Code Online (Sandbox Code Playgroud)

这里的row,是map方法中row的引用

C。创建新架构如下

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))
Run Code Online (Sandbox Code Playgroud)

d. 添加到旧架构

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)
Run Code Online (Sandbox Code Playgroud)

e. 使用新列创建新数据框

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)
Run Code Online (Sandbox Code Playgroud)