在 Spark 中将 JSON 字符串转换为没有架构的结构列

JDe*_*Dev 13 schema struct scala apache-spark apache-spark-sql

火花:3.0.0
斯卡拉:2.12.8

我的数据框有一个包含 JSON 字符串的列,我想使用 StructType 从它创建一个新列。

临时 json 字符串
{“名称”:“测试”,“id”:“12”,“类别”:[{“产品”:[“A”,“B”],“displayName”:“test_1”,“displayLabel”:“ test1"},{"products":["C"],"displayName":"test_2","displayLabel":"test2"}],"createdAt":"","createdBy":""}
root
 |-- temp_json_string: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

格式化 JSON:

{
  "name":"test",
  "id":"12",
  "category":[
    {
      "products":[
        "A",
        "B"
      ],
      "displayName":"test_1",
      "displayLabel":"test1"
    },
    {
      "products":[
        "C"
      ],
      "displayName":"test_2",
      "displayLabel":"test2"
    }
  ],
  "createdAt":"",
  "createdBy":""
}
Run Code Online (Sandbox Code Playgroud)

我想创建一个 Struct 类型的新列,所以我尝试了:

dataFrame
     .withColumn("temp_json_struct", struct(col("temp_json_string")))
     .select("temp_json_struct")
Run Code Online (Sandbox Code Playgroud)

现在,我得到的架构为:

dataFrame
     .withColumn("temp_json_struct", struct(col("temp_json_string")))
     .select("temp_json_struct")
Run Code Online (Sandbox Code Playgroud)

期望的结果:

root
 |-- temp_json_struct: struct (nullable = false)
 |    |-- temp_json_string: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

小智 16

json_str_col是具有 JSON 字符串的列。我有多个文件,所以这就是第一行迭代每一行以提取架构的原因。如果您预先知道您的模式,那么只需替换json_schema它即可。

json_schema = spark.read.json(df.rdd.map(lambda row: row.json_str_col)).schema
df = df.withColumn('new_col', from_json(col('json_str_col'), json_schema))
Run Code Online (Sandbox Code Playgroud)


Joh*_*oha 6

// import spark implicits for conversion to dataset (.as[String])
import spark.implicits._

val df = ??? //create your dataframe having the 'temp_json_string' column

//convert Dataset[Row] aka Dataframe to Dataset[String]
val ds = df.select("temp_json_string").as[String]

//read as json
spark.read.json(ds)
Run Code Online (Sandbox Code Playgroud)

文档


abi*_*sis 5

至少有两种不同的方法来检索/发现给定 JSON 的架构。

为了便于说明,我们首先创建一些数据:

import org.apache.spark.sql.types.StructType

val jsData = Seq(
  ("""{
    "name":"test","id":"12","category":[
    {
      "products":[
        "A",
        "B"
      ],
      "displayName":"test_1",
      "displayLabel":"test1"
    },
    {
      "products":[
        "C"
      ],
      "displayName":"test_2",
      "displayLabel":"test2"
    }
  ],
  "createdAt":"",
  "createdBy":""}""")
)
Run Code Online (Sandbox Code Playgroud)

选项 1:schema_of_json

第一个选项是使用内置函数schema_of_json。该函数将以 DDL 格式返回给定 JSON 的架构:

val json = jsData.toDF("js").collect()(0).getString(0)

val ddlSchema: String = spark.sql(s"select schema_of_json('${json}')")
                            .collect()(0) //get 1st row
                            .getString(0) //get 1st col of the row as string
                            .replace("null", "string") //replace type with string, this occurs since you have "createdAt":"" 

// struct<category:array<struct<displayLabel:string,displayName:string,products:array<string>>>,createdAt:null,createdBy:null,id:string,name:string>

val schema: StructType = StructType.fromDDL(s"js_schema $ddlSchema")
Run Code Online (Sandbox Code Playgroud)

请注意,您希望这schema_of_json也适用于列级别,即: schema_of_json(js_col),不幸的是,这不能按预期工作,因此我们被迫传递一个字符串。

选项 2:使用 Spark JSON 读取器(推荐)

import org.apache.spark.sql.functions.from_json

val schema: StructType = spark.read.json(jsData.toDS).schema

// schema.printTreeString

// root
//  |-- category: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- displayLabel: string (nullable = true)
//  |    |    |-- displayName: string (nullable = true)
//  |    |    |-- products: array (nullable = true)
//  |    |    |    |-- element: string (containsNull = true)
//  |-- createdAt: string (nullable = true)
//  |-- createdBy: string (nullable = true)
//  |-- id: string (nullable = true)
//  |-- name: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,我们在这里生成一个基于 DDL 字符串的模式StructType,而不是像前面的情况那样基于 DDL 字符串。

发现模式后,我们可以继续下一步,即将 JSON 数据转换为结构。为了实现这一点,我们将使用from_json内置函数:

jsData.toDF("js")
      .withColumn("temp_json_struct", from_json($"js", schema))
      .printSchema()

// root
//  |-- js: string (nullable = true)
//  |-- temp_json_struct: struct (nullable = true)
//  |    |-- category: array (nullable = true)
//  |    |    |-- element: struct (containsNull = true)
//  |    |    |    |-- displayLabel: string (nullable = true)
//  |    |    |    |-- displayName: string (nullable = true)
//  |    |    |    |-- products: array (nullable = true)
//  |    |    |    |    |-- element: string (containsNull = true)
//  |    |-- createdAt: string (nullable = true)
//  |    |-- createdBy: string (nullable = true)
//  |    |-- id: string (nullable = true)
//  |    |-- name: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)