从scala中的嵌套json文件创建spark数据帧

dev*_*t13 3 scala nested dataframe apache-spark apache-spark-sql

我有一个看起来像这样的json文件

{
"group" : {},
"lang" : [ 
    [ 1, "scala", "functional" ], 
    [ 2, "java","object" ], 
    [ 3, "py","interpreted" ]
]
}
Run Code Online (Sandbox Code Playgroud)

我尝试使用创建数据帧

val path = "some/path/to/jsonFile.json"
val df = sqlContext.read.json(path)
df.show()
Run Code Online (Sandbox Code Playgroud)

当我跑这个我得到

df: org.apache.spark.sql.DataFrame = [_corrupt_record: string]
Run Code Online (Sandbox Code Playgroud)

我们如何根据"lang"键的内容创建一个df?我不关心group {}我需要的是,从"lang"中提取数据并应用这样的case类

case class ProgLang (id: Int, lang: String, type: String )
Run Code Online (Sandbox Code Playgroud)

我已经阅读了这篇文章阅读阅读JSON与Apache Spark - `corrupt_record`并了解每条记录需要在换行符上,但在我的情况下,我无法更改文件结构

Ram*_*jan 7

json格式是错误的.在该json的API sqlContext是阅读它作为腐败的记录.正确的形式是

{"group":{},"lang":[[1,"scala","functional"],[2,"java","object"],[3,"py","interpreted"]]}
Run Code Online (Sandbox Code Playgroud)

并假设你有一个文件("/home/test.json"),那么你可以使用以下方法来获得dataframe你想要的

import org.apache.spark.sql.functions._
import sqlContext.implicits._

val df = sqlContext.read.json("/home/test.json")

val df2 = df.withColumn("lang", explode($"lang"))
    .withColumn("id", $"lang"(0))
    .withColumn("langs", $"lang"(1))
    .withColumn("type", $"lang"(2))
    .drop("lang")
    .withColumnRenamed("langs", "lang")
    .show(false)
Run Code Online (Sandbox Code Playgroud)

你应该有

+---+-----+-----------+
|id |lang |type       |
+---+-----+-----------+
|1  |scala|functional |
|2  |java |object     |
|3  |py   |interpreted|
+---+-----+-----------+
Run Code Online (Sandbox Code Playgroud)

更新

如果您不想更改下面评论中提到的输入json格式,您可以使用它wholeTextFiles来读取json文件,parse如下所示

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val readJSON = sc.wholeTextFiles("/home/test.json")
  .map(x => x._2)
  .map(data => data.replaceAll("\n", ""))

val df = sqlContext.read.json(readJSON)

val df2 = df.withColumn("lang", explode($"lang"))
  .withColumn("id", $"lang"(0).cast(IntegerType))
  .withColumn("langs", $"lang"(1))
  .withColumn("type", $"lang"(2))
  .drop("lang")
  .withColumnRenamed("langs", "lang")

df2.show(false)
df2.printSchema
Run Code Online (Sandbox Code Playgroud)

它应该给你dataframe的上方,schema

root
 |-- id: integer (nullable = true)
 |-- lang: string (nullable = true)
 |-- type: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)