Lei*_*itz 5 json scala apache-spark
我使用Spark 2.4.3 和 Scala 2.11
下面是 DataFrame 列中我当前的 JSON 字符串。我尝试使用函数将其模式存储JSON string在另一列中schema_of_json。但它的抛出低于错误。我该如何解决这个问题?
{
"company": {
"companyId": "123",
"companyName": "ABC"
},
"customer": {
"customerDetails": {
"customerId": "CUST-100",
"customerName": "CUST-AAA",
"status": "ACTIVE",
"phone": {
"phoneDetails": {
"home": {
"phoneno": "666-777-9999"
},
"mobile": {
"phoneno": "333-444-5555"
}
}
}
},
"address": {
"loc": "NORTH",
"adressDetails": [
{
"street": "BBB",
"city": "YYYYY",
"province": "AB",
"country": "US"
},
{
"street": "UUU",
"city": "GGGGG",
"province": "NB",
"country": "US"
}
]
}
}
}
Run Code Online (Sandbox Code Playgroud)
代码:
val df = spark.read.textFile("./src/main/resources/json/company.txt")
df.printSchema()
df.show()
root
|-- value: string (nullable = true)
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"company":{"companyId":"123","companyName":"ABC"},"customer":{"customerDetails":{"customerId":"CUST-100","customerName":"CUST-AAA","status":"ACTIVE","phone":{"phoneDetails":{"home":{"phoneno":"666-777-9999"},"mobile":{"phoneno":"333-444-5555"}}}},"address":{"loc":"NORTH","adressDetails":[{"street":"BBB","city":"YYYYY","province":"AB","country":"US"},{"street":"UUU","city":"GGGGG","province":"NB","country":"US"}]}}}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
df.withColumn("jsonSchema",schema_of_json(col("value")))
Run Code Online (Sandbox Code Playgroud)
错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`value`)' due to data type mismatch: The input json should be a string literal and not null; however, got `value`.;;
'Project [value#0, schemaofjson(value#0) AS jsonSchema#10]
+- Project [value#0]
+- Relation[value#0] text
Run Code Online (Sandbox Code Playgroud)
我找到的解决方案是将如下列传递value给函数schema_of_json。
df.withColumn("jsonSchema",schema_of_json(df.select(col("value")).first.getString(0)))
Run Code Online (Sandbox Code Playgroud)
礼貌:
JSON 格式的 Spark DataFrame 列上的隐式架构发现