用动态模式触发from_json

Syn*_*tax 7 json apache-spark apache-spark-sql

我正在尝试使用Spark处理具有可变结构(嵌套JSON)的JSON数据。输入JSON数据可能非常大,每行超过1000个键,而一批可能超过20 GB。整个批次已从30个数据源生成,每个JSON的“ key2”可用于标识源,并且每个源的结构都是预定义的。

处理此类数据的最佳方法是什么?我曾尝试使用from_json,如下所示,但它仅适用于固定模式,要首先使用它,我需要根据每个源对数据进行分组,然后应用该模式。由于数据量大,我的首选是仅扫描数据一次,并根据预定义的架构从每个源中提取所需的值。

import org.apache.spark.sql.types._ 
import spark.implicits._

val data = sc.parallelize(
    """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
    :: Nil)
val df = data.toDF


val schema = (new StructType)
    .add("key1", StringType)
    .add("key2", StringType)
    .add("key3", (new StructType)
    .add("key3_k1", StringType))


df.select(from_json($"value",schema).as("json_str"))
  .select($"json_str.key3.key3_k1").collect
res17: Array[org.apache.spark.sql.Row] = Array([xxx])
Run Code Online (Sandbox Code Playgroud)

Wad*_*sen 12

这只是对@Ramesh Maharjan 答案的重述,但使用了更现代的 Spark 语法。

我发现这种方法潜伏在DataFrameReader其中,它允许您将 JSON 字符串从 a 解析Dataset[String]为任意字符串,DataFrame并利用 Sparkspark.read.json("filepath")在直接从 JSON 文件中读取时为您提供的相同模式推理。每行的架构可以完全不同。

def json(jsonDataset: Dataset[String]): DataFrame
Run Code Online (Sandbox Code Playgroud)

用法示例:

val jsonStringDs = spark.createDataset[String](
  Seq(
      ("""{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}"""),
      ("""{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}""")))

jsonStringDs.show

jsonStringDs:org.apache.spark.sql.Dataset[String] = [value: string]
+----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                 
|
+----------------------------------------------------------------------------------------------------------------------+
|{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}|
|{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}  |
+----------------------------------------------------------------------------------------------------------------------+


val df = spark.read.json(jsonStringDs)
df.show(false)

df:org.apache.spark.sql.DataFrame = [CEO: string, address: struct ... 6 more fields]
+----------+------------------+-------------+---------+--------+------------+------+------------+
|CEO       |address           |employeeCount|firstname|lastname|marketCap   |name  |revenue     |
+----------+------------------+-------------+---------+--------+------------+------+------------+
|null      |[London,Baker,121]|null         |Sherlock |Holmes  |null        |null  |null        |
|Jeff Bezos|null              |500000       |null     |null    |817117000000|Amazon|177900000000|
+----------+------------------+-------------+---------+--------+------------+------+------------+
Run Code Online (Sandbox Code Playgroud)

该方法可从 Spark 2.2.0 获得:http ://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(jsonDataset: org. apache.spark.sql.Dataset[String]):org.apache.spark.sql.DataFrame


Ram*_*jan 3

如果您有问题中提到的数据

val data = sc.parallelize(
    """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
    :: Nil)
Run Code Online (Sandbox Code Playgroud)

您不需要创建schemajson数据Spark sqlschema可以从json 字符串推断。你只需要SQLContext.read.json像下面这样使用

val df = sqlContext.read.json(data)
Run Code Online (Sandbox Code Playgroud)

这将为您提供上面使用的 rdd 数据,schema如下所示

root
 |-- key1: string (nullable = true)
 |-- key2: string (nullable = true)
 |-- key3: struct (nullable = true)
 |    |-- key3_k1: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

你也可以select key3_k1

df2.select("key3.key3_k1").show(false)
//+-------+
//|key3_k1|
//+-------+
//|key3_v1|
//+-------+
Run Code Online (Sandbox Code Playgroud)

dataframe您可以根据需要操纵。我希望答案有帮助