Mun*_*esh 6 hadoop apache-spark pyspark
我正在寻找一个通用的解决方案来从 JSON 字符串列中提取所有 json 字段作为列。
df = spark.read.load(path)
df.show()
Run Code Online (Sandbox Code Playgroud)
“路径”中文件的文件格式是 parquet
样本数据
|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF","state":"CA"}}
Run Code Online (Sandbox Code Playgroud)
预期产出
|id | name | depts | sal | address_city | address_state
| 1 | "abc" | ["dep01", "dep02"] | null| null | null
| 2 | "xyz" | ["dep03"] | 100 | null | null
| 3 | "pqr" | ["dep02"] | null| "SF" | "CA"
Run Code Online (Sandbox Code Playgroud)
我知道我可以通过创建定义了架构的 StructType 并使用“from_json”方法来提取列。
但这种方法需要手动模式定义。
val myStruct = StructType(
Seq(
StructField("name", StringType),
StructField("depts", ArrayType(StringType)),
StructField("sal", IntegerType)
))
var newDf = df.withColumn("depts", from_json(col("depts"), myStruct))
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法来展平 JSON 列而无需手动定义架构?在提供的示例中,我可以看到可用的 JSON 字段。但实际上,我无法遍历所有行来查找所有字段。
因此,我正在寻找一种解决方案,将所有字段拆分为列,而不指定列的名称或类型。
根据 @Gaurang Shah 的回答,我实现了一个解决方案来处理嵌套 JSON 结构,并修复了使用 monotonically_increasing_id(Non-sequential) 的问题
在此方法中,“populateColumnName”函数递归地检查 StructType 列并填充列名称。
'renameColumns' 函数通过替换 '.' 来重命名列 用 '_' 来标识嵌套的 json 字段。
'addIndex' 函数将索引添加到数据帧,以在解析 JSON 列后加入数据帧。
def flattenJSON(df : DataFrame, columnName: String) : DataFrame = {
val indexCol = "internal_temp_id"
def populateColumnName(col : StructField) : Array[String] = {
col.dataType match {
case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _)
case rest => Array(col.name)
}
}
def renameColumns(name : String) : String = {
if(name contains ".") {
name + " as " + name.replaceAll("\\.", "_")
}
else name
}
def addIndex(df : DataFrame) : DataFrame = {
// Append "rowid" column of type Long
val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false)))
// Zip on RDD level
val rddWithId = df.rdd.zipWithIndex
// Convert back to DataFrame
spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
}
val dfWithID = addIndex(df)
val jsonDF = df.select(columnName)
val ds = jsonDF.rdd.map(_.getString(0)).toDS
val parseDF = spark.read.option("inferSchema",true).json(ds)
val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns)
var resultDF = parseDF.selectExpr(columnNames:_*)
val jsonDFWithID = addIndex(resultDF)
val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol)
joinDF
}
val res = flattenJSON(jsonDF, "address")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8222 次 |
| 最近记录: |