小编Syn*_*tax的帖子

用动态模式触发from_json

我正在尝试使用Spark处理具有可变结构(嵌套JSON)的JSON数据。输入JSON数据可能非常大,每行超过1000个键,而一批可能超过20 GB。整个批次已从30个数据源生成,每个JSON的“ key2”可用于标识源,并且每个源的结构都是预定义的。

处理此类数据的最佳方法是什么?我曾尝试使用from_json,如下所示,但它仅适用于固定模式,要首先使用它,我需要根据每个源对数据进行分组,然后应用该模式。由于数据量大,我的首选是仅扫描数据一次,并根据预定义的架构从每个源中提取所需的值。

import org.apache.spark.sql.types._ 
import spark.implicits._

val data = sc.parallelize(
    """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
    :: Nil)
val df = data.toDF


val schema = (new StructType)
    .add("key1", StringType)
    .add("key2", StringType)
    .add("key3", (new StructType)
    .add("key3_k1", StringType))


df.select(from_json($"value",schema).as("json_str"))
  .select($"json_str.key3.key3_k1").collect
res17: Array[org.apache.spark.sql.Row] = Array([xxx])
Run Code Online (Sandbox Code Playgroud)

json apache-spark apache-spark-sql

7
推荐指数
2
解决办法
1万
查看次数

标签 统计

apache-spark ×1

apache-spark-sql ×1

json ×1