我使用Spark 2.4.3 和 Scala 2.11
下面是 DataFrame 列中我当前的 JSON 字符串。我尝试使用函数将其模式存储JSON string在另一列中schema_of_json。但它的抛出低于错误。我该如何解决这个问题?
{
"company": {
"companyId": "123",
"companyName": "ABC"
},
"customer": {
"customerDetails": {
"customerId": "CUST-100",
"customerName": "CUST-AAA",
"status": "ACTIVE",
"phone": {
"phoneDetails": {
"home": {
"phoneno": "666-777-9999"
},
"mobile": {
"phoneno": "333-444-5555"
}
}
}
},
"address": {
"loc": "NORTH",
"adressDetails": [
{
"street": "BBB",
"city": "YYYYY",
"province": "AB",
"country": "US"
},
{
"street": "UUU",
"city": "GGGGG",
"province": "NB",
"country": "US"
}
]
}
} …Run Code Online (Sandbox Code Playgroud) 我想将n/a以下数据框中的所有值替换为unknown. 它可以是scalar或complex nested column。如果是StructField column我可以遍历列并n\a使用WithColumn. 但我想这在做generic way的inspitetype列,因为我不想因为有100个的在我的情况,以明确指定列名?
case class Bar(x: Int, y: String, z: String)
case class Foo(id: Int, name: String, status: String, bar: Seq[Bar])
val df = spark.sparkContext.parallelize(
Seq(
Foo(123, "Amy", "Active", Seq(Bar(1, "first", "n/a"))),
Foo(234, "Rick", "n/a", Seq(Bar(2, "second", "fifth"),Bar(22, "second", "n/a"))),
Foo(567, "Tom", "null", Seq(Bar(3, "second", "sixth")))
)).toDF
df.printSchema
df.show(20, false)
Run Code Online (Sandbox Code Playgroud)
结果:
+---+----+------+---------------------------------------+
|id |name|status|bar |
+---+----+------+---------------------------------------+
|123|Amy |Active|[[1, …Run Code Online (Sandbox Code Playgroud)