Spark from_json - 如何处理损坏的记录

nok*_*kse 4 apache-spark apache-spark-sql pyspark

from_json()我有一个用例,我从表中读取数据并通过指定架构将字符串列解析为另一个字符串列:

from pyspark.sql.functions import from_json, col

spark = SparkSession.builder.appName("FromJsonExample").getOrCreate()

input_df = spark.sql("SELECT * FROM input_table")

json_schema = "struct<field1:string, field2:integer>"

output_df = input_df.withColumn("parsed_json", from_json(col("json_column"), json_schema))

output_df.show()
Run Code Online (Sandbox Code Playgroud)

我正在尝试实现本专栏的错误处理。如果记录损坏,我想将该列保留为空,但我想要另一个错误列,其中包含任何可以表明我的记录已损坏的内容,以便我可以稍后过滤此记录。阅读文档后,有点不清楚这个功能支持什么。文档中指出,您可以将“选项”配置为与 json 数据源相同(“控制解析的选项。接受与 json 数据源相同的选项”),但直到尝试将“PERMISSIVE”模式与“ columnNameOfCorruptRecord”,如果记录损坏,它不会生成新列。搜索并没有得到很多结果,但据我所知(尽管未经测试或确认)Databricks 文档指定您可以使用此设置来生成错误列(https://docs.databricks.com/sql/language-手册/functions/from_json.html)。

需要明确的是,我没有使用 Databricks,但据我所知,该公司是由 Apache Spark 基金会创立的,因此我的期望是使用/提供可以在任何地方使用的相同工具。另外,我对使用“from_json”的特定用例感兴趣,而不是使用“read.json()”读取数据并在那里配置选项,因为这在我的用例中是不可能的。

仍然存在以下问题:

  1. 如何处理“from_json”方法的错误?如果发生解析错误,是否有另一种方法来创建列。
  2. 有没有办法删除格式错误的记录,因为“from_json() 的“选项”似乎不支持“DROPMALFORMED”配置。事后通过空列进行检查是不可能的,因为在处理之前它可能已经为空。

aru*_*rne 5

示例数据帧 (df_1):

+---------------------------+
|json_column                |
+---------------------------+
|{'field1':'A', 'field2':1} |
|{'field1':'B', 'field2':2} |
|{'field1': C, 'field2':'3'}|
+---------------------------+
Run Code Online (Sandbox Code Playgroud)

导入必要的包:

from pyspark.sql.functions import from_json, col, when
Run Code Online (Sandbox Code Playgroud)
  1. 使用columnNameOfCorruptRecord选项来跟踪错误记录
json_options = {"columnNameOfCorruptRecord":"corrupt_json"}
json_schema = "struct<field1:string, field2:integer, corrupt_json:string>"

df_2 = input_df.withColumn(
    "parsed_json",
    from_json(
        col("json_column"),
        schema = json_schema,
        options = json_options
    )
)
Run Code Online (Sandbox Code Playgroud)

JSON 记录损坏

  1. 创建一个新列corrupt_json并从中删除 Corrupt_json 字段parsed_json
df_3 = df_2 \
    .withColumn("corrupt_json", col("parsed_json.corrupt_json")) \
    .withColumn("parsed_json", col("parsed_json").dropFields("corrupt_json"))
Run Code Online (Sandbox Code Playgroud)
  1. parsed_jsonnull值更新损坏的记录
df_4 = df_3.withColumn("parsed_json", when(col("corrupt_json").isNotNull(), None).otherwise(col("parsed_json")))
Run Code Online (Sandbox Code Playgroud)
  1. 删除损坏的记录
df_5 = df_4.filter(col("parsed_json").isNotNull()).drop("parsed_json","corrupt_json")
df_5.show()
Run Code Online (Sandbox Code Playgroud)

输出

+--------------------------+
|json_column               |
+--------------------------+
|{'field1':'A', 'field2':1}|
|{'field1':'B', 'field2':2}|
+--------------------------+
Run Code Online (Sandbox Code Playgroud)