将 JSON 字符串列拆分为多列

Mun*_*esh 6 hadoop apache-spark pyspark

我正在寻找一个通用的解决方案来从 JSON 字符串列中提取所有 json 字段作为列。

df =  spark.read.load(path)
df.show()
Run Code Online (Sandbox Code Playgroud)

“路径”中文件的文件格式是 parquet

样本数据

|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF","state":"CA"}}
Run Code Online (Sandbox Code Playgroud)

预期产出

|id | name    | depts              | sal | address_city | address_state
| 1 | "abc"   | ["dep01", "dep02"] | null| null         | null
| 2 | "xyz"   | ["dep03"]          | 100 | null         | null
| 3 | "pqr"   | ["dep02"]          | null| "SF"         | "CA"
Run Code Online (Sandbox Code Playgroud)

我知道我可以通过创建定义了架构的 StructType 并使用“from_json”方法来提取列。

但这种方法需要手动模式定义。

val myStruct = StructType(
  Seq(
    StructField("name", StringType),
    StructField("depts", ArrayType(StringType)),
    StructField("sal", IntegerType)
  ))

var newDf = df.withColumn("depts", from_json(col("depts"), myStruct))
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法来展平 JSON 列而无需手动定义架构?在提供的示例中,我可以看到可用的 JSON 字段。但实际上,我无法遍历所有行来查找所有字段。

因此,我正在寻找一种解决方案,将所有字段拆分为列,而不指定列的名称或类型。

Mun*_*esh 1

根据 @Gaurang Shah 的回答,我实现了一个解决方案来处理嵌套 JSON 结构,并修复了使用 monotonically_increasing_id(Non-sequential) 的问题

在此方法中,“populateColumnName”函数递归地检查 StructType 列并填充列名称。

'renameColumns' 函数通过替换 '.' 来重命名列 用 '_' 来标识嵌套的 json 字段。

'addIndex' 函数将索引添加到数据帧,以在解析 JSON 列后加入数据帧。

def flattenJSON(df : DataFrame, columnName: String) : DataFrame = {

    val indexCol = "internal_temp_id"

    def populateColumnName(col : StructField) : Array[String] = {
        col.dataType match {
          case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _)
          case rest         => Array(col.name)
        }
    }

    def renameColumns(name : String) : String = {
        if(name contains ".") {
            name + " as " + name.replaceAll("\\.", "_")
        }
        else name
    }

    def addIndex(df : DataFrame) : DataFrame = {

        // Append "rowid" column of type Long
        val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false)))

        // Zip on RDD level
        val rddWithId = df.rdd.zipWithIndex
        // Convert back to DataFrame
        spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
    }

    val dfWithID = addIndex(df)

    val jsonDF = df.select(columnName)

    val ds = jsonDF.rdd.map(_.getString(0)).toDS
    val parseDF = spark.read.option("inferSchema",true).json(ds)

    val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns)

    var resultDF = parseDF.selectExpr(columnNames:_*)

    val jsonDFWithID = addIndex(resultDF)

    val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol)

    joinDF
}

val res = flattenJSON(jsonDF, "address")
Run Code Online (Sandbox Code Playgroud)