我正在尝试使用Spark处理具有可变结构(嵌套JSON)的JSON数据。输入JSON数据可能非常大,每行超过1000个键,而一批可能超过20 GB。整个批次已从30个数据源生成,每个JSON的“ key2”可用于标识源,并且每个源的结构都是预定义的。
处理此类数据的最佳方法是什么?我曾尝试使用from_json,如下所示,但它仅适用于固定模式,要首先使用它,我需要根据每个源对数据进行分组,然后应用该模式。由于数据量大,我的首选是仅扫描数据一次,并根据预定义的架构从每个源中提取所需的值。
import org.apache.spark.sql.types._
import spark.implicits._
val data = sc.parallelize(
"""{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
:: Nil)
val df = data.toDF
val schema = (new StructType)
.add("key1", StringType)
.add("key2", StringType)
.add("key3", (new StructType)
.add("key3_k1", StringType))
df.select(from_json($"value",schema).as("json_str"))
.select($"json_str.key3.key3_k1").collect
res17: Array[org.apache.spark.sql.Row] = Array([xxx])
Run Code Online (Sandbox Code Playgroud)