roa*_*627 5 scala dataframe apache-spark apache-spark-sql
我目前正在使用结构化流来消费来自 Kafka 的消息
该消息的原始格式具有以下模式结构
root
|-- incidentMessage: struct (nullable = true)
| |-- AssignedUnitEvent: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- CallNumber: string (nullable = true)
| | | |-- Code: string (nullable = true)
| | | |-- EventDateTime: string (nullable = true)
| | | |-- EventDispatcherID: string (nullable = true)
| | | |-- ID: string (nullable = true)
| | | |-- Notes: string (nullable = true)
| | | |-- PhoneNumberCalled: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- SubCallNumber: string (nullable = true)
| | | |-- SupItemNumber: string (nullable = true)
| | | |-- Type: string (nullable = true)
| | | |-- UnitID: string (nullable = true)
|-- preamble: struct (nullable = true)
| |-- gateway: string (nullable = true)
| |-- product: string (nullable = true)
| |-- psap: string (nullable = true)
| |-- refDataVersion: long (nullable = true)
| |-- source: string (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- uuid: string (nullable = true)
| |-- vendor: string (nullable = true)
| |-- version: string (nullable = true)
|-- raw: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
但是,我在定义消息的模式(在流组件中)时犯了一个错误,并且我编写了将所有根列转换为字符串的代码。
这是我写的代码
//Define the schema
val schema1 = new StructType().add("preamble",DataTypes.StringType).add("incidentMessage",DataTypes.StringType).add("raw",DataTypes.StringType)
//Apply the schema to the message (payload)
val finalResult = Df.withColumn("FinalFrame",from_json($"payload",schema1)).select($"FinalFrame.*")
Run Code Online (Sandbox Code Playgroud)
现在我的数据框看起来像这样
scala> finalResult.printSchema
root
|-- incidentMessage: string (nullable = true)
|-- preamble: string (nullable = true)
|-- raw: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我现在有一大堆具有不正确架构的消息。我尝试将正确的架构应用于我现在拥有的消息,但是写入文件系统的消息集具有可变架构(事件消息的嵌套列中存在更改)并且这种方法不起作用(我搞砸了,应该使用 Avro)
有没有办法恢复这些数据并使其保持正确的格式?
虽然创建只有 1 个字段的 e 结构体没有多大意义,但您可以使用struct函数来做到这一点:
import org.apache.spark.sql.functions.struct
df.withColumn("incidentMessage",struct($"incidentMessage"))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5919 次 |
| 最近记录: |