nok*_*kse 4 apache-spark apache-spark-sql pyspark
from_json()我有一个用例,我从表中读取数据并通过指定架构将字符串列解析为另一个字符串列:
from pyspark.sql.functions import from_json, col
spark = SparkSession.builder.appName("FromJsonExample").getOrCreate()
input_df = spark.sql("SELECT * FROM input_table")
json_schema = "struct<field1:string, field2:integer>"
output_df = input_df.withColumn("parsed_json", from_json(col("json_column"), json_schema))
output_df.show()
Run Code Online (Sandbox Code Playgroud)
我正在尝试实现本专栏的错误处理。如果记录损坏,我想将该列保留为空,但我想要另一个错误列,其中包含任何可以表明我的记录已损坏的内容,以便我可以稍后过滤此记录。阅读文档后,有点不清楚这个功能支持什么。文档中指出,您可以将“选项”配置为与 json 数据源相同(“控制解析的选项。接受与 json 数据源相同的选项”),但直到尝试将“PERMISSIVE”模式与“ columnNameOfCorruptRecord”,如果记录损坏,它不会生成新列。搜索并没有得到很多结果,但据我所知(尽管未经测试或确认)Databricks 文档指定您可以使用此设置来生成错误列(https://docs.databricks.com/sql/language-手册/functions/from_json.html)。
需要明确的是,我没有使用 Databricks,但据我所知,该公司是由 Apache Spark 基金会创立的,因此我的期望是使用/提供可以在任何地方使用的相同工具。另外,我对使用“from_json”的特定用例感兴趣,而不是使用“read.json()”读取数据并在那里配置选项,因为这在我的用例中是不可能的。
仍然存在以下问题:
示例数据帧 (df_1):
+---------------------------+
|json_column |
+---------------------------+
|{'field1':'A', 'field2':1} |
|{'field1':'B', 'field2':2} |
|{'field1': C, 'field2':'3'}|
+---------------------------+
Run Code Online (Sandbox Code Playgroud)
导入必要的包:
from pyspark.sql.functions import from_json, col, when
Run Code Online (Sandbox Code Playgroud)
columnNameOfCorruptRecord选项来跟踪错误记录json_options = {"columnNameOfCorruptRecord":"corrupt_json"}
json_schema = "struct<field1:string, field2:integer, corrupt_json:string>"
df_2 = input_df.withColumn(
"parsed_json",
from_json(
col("json_column"),
schema = json_schema,
options = json_options
)
)
Run Code Online (Sandbox Code Playgroud)
corrupt_json并从中删除 Corrupt_json 字段parsed_jsondf_3 = df_2 \
.withColumn("corrupt_json", col("parsed_json.corrupt_json")) \
.withColumn("parsed_json", col("parsed_json").dropFields("corrupt_json"))
Run Code Online (Sandbox Code Playgroud)
parsed_json用null值更新损坏的记录df_4 = df_3.withColumn("parsed_json", when(col("corrupt_json").isNotNull(), None).otherwise(col("parsed_json")))
Run Code Online (Sandbox Code Playgroud)
df_5 = df_4.filter(col("parsed_json").isNotNull()).drop("parsed_json","corrupt_json")
df_5.show()
Run Code Online (Sandbox Code Playgroud)
输出
+--------------------------+
|json_column |
+--------------------------+
|{'field1':'A', 'field2':1}|
|{'field1':'B', 'field2':2}|
+--------------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
753 次 |
| 最近记录: |