Mar*_*kus 1 json scala apache-spark apache-spark-sql
在我的 Spark (2.2) DataFrame 中,每一行都是 JSON:
df.head()
//output
//[{"key":"111","event_name":"page-visited","timestamp":1517814315}]
df.show()
//output
//+--------------+
//| value|
//+--------------+
//|{"key":"111...|
//|{"key":"222...|
Run Code Online (Sandbox Code Playgroud)
我想将每个 JSON 行传递给列以获得这个result:
key event_name timestamp
111 page-visited 1517814315
...
Run Code Online (Sandbox Code Playgroud)
我试过这种方法,但它没有给我预期的结果:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("key", StringType, true), StructField("event_name", StringType, true), StructField("timestamp", IntegerType, true)
))
val result = df.withColumn("value", from_json($"value", schema))
Run Code Online (Sandbox Code Playgroud)
和:
result.printSchema()
root
|-- value: struct (nullable = true)
| |-- key: string (nullable = true)
| |-- event_name: string (nullable = true)
| |-- timestamp: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
虽然它应该是:
result.printSchema()
root
|-- key: string (nullable = true)
|-- event_name: string (nullable = true)
|-- timestamp: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
您可以select($"value.*")在最后使用struct将列的元素选择为单独的列
val result = df.withColumn("value", from_json($"value", schema)).select($"value.*")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4165 次 |
| 最近记录: |