如何在 Spark 中将 Dataframe 的 String 列转换为 Struct

roa*_*627 5 scala dataframe apache-spark apache-spark-sql

我目前正在使用结构化流来消费来自 Kafka 的消息

该消息的原始格式具有以下模式结构

root
 |-- incidentMessage: struct (nullable = true)
 |    |-- AssignedUnitEvent: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- CallNumber: string (nullable = true)
 |    |    |    |-- Code: string (nullable = true)
 |    |    |    |-- EventDateTime: string (nullable = true)
 |    |    |    |-- EventDispatcherID: string (nullable = true)
 |    |    |    |-- ID: string (nullable = true)
 |    |    |    |-- Notes: string (nullable = true)
 |    |    |    |-- PhoneNumberCalled: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- SubCallNumber: string (nullable = true)
 |    |    |    |-- SupItemNumber: string (nullable = true)
 |    |    |    |-- Type: string (nullable = true)
 |    |    |    |-- UnitID: string (nullable = true)
 |-- preamble: struct (nullable = true)
 |    |-- gateway: string (nullable = true)
 |    |-- product: string (nullable = true)
 |    |-- psap: string (nullable = true)
 |    |-- refDataVersion: long (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |    |-- uuid: string (nullable = true)
 |    |-- vendor: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- raw: string (nullable = true)

Run Code Online (Sandbox Code Playgroud)

但是,我在定义消息的模式(在流组件中)时犯了一个错误,并且我编写了将所有根列转换为字符串的代码。

这是我写的代码

//Define the schema

val schema1 = new StructType().add("preamble",DataTypes.StringType).add("incidentMessage",DataTypes.StringType).add("raw",DataTypes.StringType)

//Apply the schema to the message (payload)

val finalResult = Df.withColumn("FinalFrame",from_json($"payload",schema1)).select($"FinalFrame.*")
Run Code Online (Sandbox Code Playgroud)

现在我的数据框看起来像这样

scala> finalResult.printSchema
root
 |-- incidentMessage: string (nullable = true)
 |-- preamble: string (nullable = true)
 |-- raw: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我现在有一大堆具有不正确架构的消息。我尝试将正确的架构应用于我现在拥有的消息,但是写入文件系统的消息集具有可变架构(事件消息的嵌套列中存在更改)并且这种方法不起作用(我搞砸了,应该使用 Avro)

有没有办法恢复这些数据并使其保持正确的格式?

Rap*_*oth 5

虽然创建只有 1 个字段的 e 结构体没有多大意义,但您可以使用struct函数来做到这一点:

import org.apache.spark.sql.functions.struct

df.withColumn("incidentMessage",struct($"incidentMessage"))
Run Code Online (Sandbox Code Playgroud)