Man*_*kla 3 scala apache-spark apache-spark-sql
我有一个文件,其中包含一堆列,其中一列称为jsonstring字符串类型,其中包含json字符串……可以说,格式如下:
{
"key1": "value1",
"key2": {
"level2key1": "level2value1",
"level2key2": "level2value2"
}
}
Run Code Online (Sandbox Code Playgroud)
我想像这样解析此列:jsonstring.key1,jsonstring.key2.level2key1返回value1,level2value1
我该如何在scala或spark sql中做到这一点。
在Spark 2.2中,您可以使用from_json函数为您执行JSON解析。
from_json(e:列,架构:字符串,选项:Map [String,String]):列将包含JSON字符串的列解析为具有指定架构的
StructType或。ArrayTypeStructTypes
支持使用*(star)展平嵌套列,这似乎是最好的解决方案。
// the input dataset (just a single JSON blob)
val jsonstrings = Seq("""{
"key1": "value1",
"key2": {
"level2key1": "level2value1",
"level2key2": "level2value2"
}
}""").toDF("jsonstring")
// define the schema of JSON messages
import org.apache.spark.sql.types._
val key2schema = new StructType()
.add($"level2key1".string)
.add($"level2key2".string)
val schema = new StructType()
.add($"key1".string)
.add("key2", key2schema)
scala> schema.printTreeString
root
|-- key1: string (nullable = true)
|-- key2: struct (nullable = true)
| |-- level2key1: string (nullable = true)
| |-- level2key2: string (nullable = true)
val messages = jsonstrings
.select(from_json($"jsonstring", schema) as "json")
.select("json.*") // <-- flattening nested fields
scala> messages.show(truncate = false)
+------+---------------------------+
|key1 |key2 |
+------+---------------------------+
|value1|[level2value1,level2value2]|
+------+---------------------------+
scala> messages.select("key1", "key2.*").show(truncate = false)
+------+------------+------------+
|key1 |level2key1 |level2key2 |
+------+------------+------------+
|value1|level2value1|level2value2|
+------+------------+------------+
Run Code Online (Sandbox Code Playgroud)
小智 4
您可以使用 withColumn + udf + json4s:
import org.json4s.{DefaultFormats, MappingException}
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.functions._
def getJsonContent(jsonstring: String): (String, String) = {
implicit val formats = DefaultFormats
val parsedJson = parse(jsonstring)
val value1 = (parsedJson \ "key1").extract[String]
val level2value1 = (parsedJson \ "key2" \ "level2key1").extract[String]
(value1, level2value1)
}
val getJsonContentUDF = udf((jsonstring: String) => getJsonContent(jsonstring))
df.withColumn("parsedJson", getJsonContentUDF(df("jsonstring")))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8384 次 |
| 最近记录: |