如何在 DataFrame 中展开数组(来自 JSON)?

Alo*_*atz 4 scala dataframe apache-spark apache-spark-sql

RDD 中的每条记录都包含一个 json。我正在使用 SQLContext 从 Json 创建一个 DataFrame,如下所示:

val signalsJsonRdd = sqlContext.jsonRDD(signalsJson)
Run Code Online (Sandbox Code Playgroud)

下面是架构。datapayload 是一个项目数组。我想分解项目数组以获得一个数据框,其中每一行都是数据有效负载中的一个项目。我尝试根据这个答案做一些事情,但似乎我需要在case Row(arr: Array[...])语句中对项目的整个结构进行建模。我可能错过了一些东西。

val payloadDfs = signalsJsonRdd.explode($"data.datapayload"){ 
    case org.apache.spark.sql.Row(arr: Array[String]) =>  arr.map(Tuple1(_)) 
}
Run Code Online (Sandbox Code Playgroud)

上面的代码抛出了 scala.MatchError,因为实际 Row 的类型与 Row(arr: Array[String]) 有很大不同。可能有一种简单的方法可以做我想做的事,但我找不到它。请帮忙。

架构如下

signalsJsonRdd.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- dataid: string (nullable = true)
 |    |-- datapayload: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Reading: struct (nullable = true)
 |    |    |    |    |-- A2DPActive: boolean (nullable = true)
 |    |    |    |    |-- Accuracy: double (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Address: string (nullable = true)
 |    |    |    |    |-- Charging: boolean (nullable = true)
 |    |    |    |    |-- Connected: boolean (nullable = true)
 |    |    |    |    |-- DeviceName: string (nullable = true)
 |    |    |    |    |-- Guid: string (nullable = true)
 |    |    |    |    |-- HandsFree: boolean (nullable = true)
 |    |    |    |    |-- Header: double (nullable = true)
 |    |    |    |    |-- Heading: double (nullable = true)
 |    |    |    |    |-- Latitude: double (nullable = true)
 |    |    |    |    |-- Longitude: double (nullable = true)
 |    |    |    |    |-- PositionSource: long (nullable = true)
 |    |    |    |    |-- Present: boolean (nullable = true)
 |    |    |    |    |-- Radius: double (nullable = true)
 |    |    |    |    |-- SSID: string (nullable = true)
 |    |    |    |    |-- SSIDLength: long (nullable = true)
 |    |    |    |    |-- SpeedInKmh: double (nullable = true)
 |    |    |    |    |-- State: string (nullable = true)
 |    |    |    |    |-- Time: string (nullable = true)
 |    |    |    |    |-- Type: string (nullable = true)
 |    |    |    |-- Time: string (nullable = true)
 |    |    |    |-- Type: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

Jac*_*ski 5

tl;dr explode函数是你的朋友(或者我最喜欢的flatMap)。

explode函数为给定数组或映射列中的每个元素创建一个新行。

像下面这样的东西应该有效:

signalsJsonRdd.withColumn("element", explode($"data.datapayload"))
Run Code Online (Sandbox Code Playgroud)

请参阅函数对象。