JSON格式的Spark DataFrame列上的隐式模式发现

ngc*_*359 5 scala apache-spark

我正在Scala中编写ETL Spark(2.4)作业,该读取;在S3上具有glob模式的分隔CSV文件。数据已加载到DataFrame中,并包含一列(即名为custom)和JSON格式的字符串(多层嵌套)。目标是从该列自动推断模式,以便可以针对S3中的Parquet文件上的写接收器进行结构化。

这篇文章(如何使用Spark DataFrames查询JSON数据列?)建议schema_of_jsonSpark 2.4可以从JSON格式的列或字符串推断模式。

这是我尝试过的:

val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

df.withColumn(
    "nestedCustom",
    from_json(col("custom"), jsonSchema, Map[String, String]())
)
Run Code Online (Sandbox Code Playgroud)

但是上述方法不起作用并引发此异常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]
Run Code Online (Sandbox Code Playgroud)

请记住,我正在custom为此DataFrame 筛选出空值。


编辑:下面的整个代码。

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]
Run Code Online (Sandbox Code Playgroud)

还有一个JSON示例:

{
  "lvl1":  {
    "lvl2a": {
      "lvl3a":   {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    },
    "lvl2b":   {
      "lvl3a":   {
        "lvl4a": "ramdom_data"
      },
      "lvl3b":  {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

小智 1

该指标custom不是有效输入schema_of_json

scala> spark.sql("SELECT schema_of_json(struct(1, 2))")
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(named_struct('col1', 1, 'col2', 2))' due to data type mismatch: argument 1 requires string type, however, 'named_struct('col1', 1, 'col2', 2)' is of struct<col1:int,col2:int> type.; line 1 pos 7;
...
Run Code Online (Sandbox Code Playgroud)

您应该返回到您的数据并确保它custom确实是一个String.

  • 我找到了一个解决方案(有效),但感觉很老套:`val jsonSchema: String = df.select(schema_of_json(df.select(col("custom")).first.getString(0))).as[String]。首先`您认为有更好的方法可以通过直接寻址函数签名所宣传的列来实现此目的吗? (6认同)