仅展平 Scala Spark 数据帧中的最深级别

myt*_*hic 1 json scala flatten apache-spark apache-spark-sql

我有一个 Spark 作业,它有一个具有以下值的 DataFrame:

{
  "id": "abchchd",
  "test_id": "ndsbsb",
  "props": {
    "type": {
      "isMale": true,
      "id": "dd",
      "mcc": 1234,
      "name": "Adam"
    }
  }
}

{
  "id": "abc",
  "test_id": "asf",
  "props": {
    "type2": {
      "isMale": true,
      "id": "dd",
      "mcc": 12134,
      "name": "Perth"
    }
  }
}

Run Code Online (Sandbox Code Playgroud)

我想优雅地将它展平(因为没有未知的键和类型等),这样道具仍然是一个,struct但里面的所有东西都被展平了(不管嵌套的级别如何)

所需的输出是:

{
  "id": "abchchd",
  "test_id": "ndsbsb",
  "props": {
    "type.isMale": true,
    "type.id": "dd",
    "type.mcc": 1234,
    "type.name": "Adam"
  }
}

{
  "id": "abc",
  "test_id": "asf",
  "props": {
      "type2.isMale": true,
      "type2.id": "dd",
      "type2.mcc": 12134,
      "type2.name": "Perth"
  }
}
Run Code Online (Sandbox Code Playgroud)

我使用了Automatically and Elegantly flatten DataFrame in Spark SQL 中提到的解决方案

但是,我无法保持道具场完好无损。它也会被压扁。有人可以帮助我扩展此解决方案吗?

最终的架构应该是这样的:

root
 |-- id: string (nullable = true)
 |-- props: struct (nullable = true)
 |    |-- type.id: string (nullable = true)
 |    |-- type.isMale: boolean (nullable = true)
 |    |-- type.mcc: long (nullable = true)
 |    |-- type.name: string (nullable = true)
      |-- type2.id: string (nullable = true)
 |    |-- type2.isMale: boolean (nullable = true)
 |    |-- type2.mcc: long (nullable = true)
 |    |-- type2.name: string (nullable = true)
 |-- test_id: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

bai*_*rek 5

我已经能够使用 RDD API 实现这一点:

val jsonRDD = df.rdd.map{row =>
  def unnest(r: Row): Map[String, Any] = {
    r.schema.fields.zipWithIndex.flatMap{case (f, i) =>
      (f.name, f.dataType) match {
        case ("props", _:StructType) =>
          val propsObject = r.getAs[Row](f.name)
          Map(f.name -> propsObject.schema.fields.flatMap{propsAttr =>
            val subObject = propsObject.getAs[Row](propsAttr.name)
            subObject.schema.fields.map{subField =>
              s"${propsAttr.name}.${subField.name}" -> subObject.get(subObject.fieldIndex(subField.name))
            }
          }.toMap)
        case (fname, _: StructType) => Map(fname -> unnest(r.getAs[Row](fname)))
        case (fname, ArrayType(_: StructType,_)) => Map(fname -> r.getAs[Seq[Row]](fname).map(unnest))
        case _ => Map(f.name -> r.get(i))
      }
    }
  }.toMap

  val asMap = unnest(row)
  new ObjectMapper().registerModule(DefaultScalaModule).writeValueAsString(asMap)
}

val finalDF = spark.read.json(jsonRDD.toDS).cache
Run Code Online (Sandbox Code Playgroud)

由于递归,解决方案应该接受深度嵌套的输入。

有了你的数据,我们得到了:

finalDF.printSchema()
finalDF.show(false)
finalDF.select("props.*").show()
Run Code Online (Sandbox Code Playgroud)

输出:

root
 |-- id: string (nullable = true)
 |-- props: struct (nullable = true)
 |    |-- type.id: string (nullable = true)
 |    |-- type.isMale: boolean (nullable = true)
 |    |-- type.mcc: long (nullable = true)
 |    |-- type.name: string (nullable = true)
 |-- test_id: string (nullable = true)

+-------+----------------------+-------+
|id     |props                 |test_id|
+-------+----------------------+-------+
|abchchd|[dd, true, 1234, Adam]|ndsbsb |
+-------+----------------------+-------+

+-------+-----------+--------+---------+
|type.id|type.isMale|type.mcc|type.name|
+-------+-----------+--------+---------+
|     dd|       true|    1234|     Adam|
+-------+-----------+--------+---------+
Run Code Online (Sandbox Code Playgroud)

但是我们也可以传递更多嵌套/复杂的结构,例如:

val str2 = """{"newroot":[{"mystruct":{"id":"abchchd","test_id":"ndsbsb","props":{"type":{"isMale":true,"id":"dd","mcc":1234,"name":"Adam"}}}}]}"""

...

finalDF.printSchema()
finalDF.show(false)
Run Code Online (Sandbox Code Playgroud)

给出以下输出:

root
 |-- newroot: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- mystruct: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- props: struct (nullable = true)
 |    |    |    |    |-- type.id: string (nullable = true)
 |    |    |    |    |-- type.isMale: boolean (nullable = true)
 |    |    |    |    |-- type.mcc: long (nullable = true)
 |    |    |    |    |-- type.name: string (nullable = true)
 |    |    |    |-- test_id: string (nullable = true)

+---------------------------------------------+
|root                                         |
+---------------------------------------------+
|[[[abchchd, [dd, true, 1234, Adam], ndsbsb]]]|
+---------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

编辑:正如您所提到的,如果您有不同结构的记录,您需要将上述subObject值包装在一个选项中。
这是固定unnest功能:

def unnest(r: Row): Map[String, Any] = {
  r.schema.fields.zipWithIndex.flatMap{case (f, i) =>
    (f.name, f.dataType) match {
      case ("props", _:StructType) =>
        val propsObject = r.getAs[Row](f.name)
        Map(f.name -> propsObject.schema.fields.flatMap{propsAttr =>
          val subObjectOpt = Option(propsObject.getAs[Row](propsAttr.name))
          subObjectOpt.toSeq.flatMap{subObject => subObject.schema.fields.map{subField =>
            s"${propsAttr.name}.${subField.name}" -> subObject.get(subObject.fieldIndex(subField.name))
          }}
        }.toMap)
      case (fname, _: StructType) => Map(fname -> unnest(r.getAs[Row](fname)))
      case (fname, ArrayType(_: StructType,_)) => Map(fname -> r.getAs[Seq[Row]](fname).map(unnest))
      case _ => Map(f.name -> r.get(i))
    }
  }
}.toMap
Run Code Online (Sandbox Code Playgroud)

printSchema给出:

root
 |-- id: string (nullable = true)
 |-- props: struct (nullable = true)
 |    |-- type.id: string (nullable = true)
 |    |-- type.isMale: boolean (nullable = true)
 |    |-- type.mcc: long (nullable = true)
 |    |-- type.name: string (nullable = true)
 |    |-- type2.id: string (nullable = true)
 |    |-- type2.isMale: boolean (nullable = true)
 |    |-- type2.mcc: long (nullable = true)
 |    |-- type2.name: string (nullable = true)
 |-- test_id: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)