如何在 spark 2.X 中验证 Json 模式?

Dig*_*ure 5 json scala apache-spark spark-streaming pyspark

使用Spark流(用Scala编写)从 Kafka 读取消息。消息都是 Json 格式的字符串。

在局部变量中定义预期的模式,expectedSchema 然后将 RDD 中的字符串解析为 Json

spark.sqlContext.read.schema(schema).json(rdd.toDS())
Run Code Online (Sandbox Code Playgroud)

问题: Spark 将处理所有记录/行,只要它有一些我尝试读取的字段,即使输入行(字符串)的实际 Json 格式(即架构)与我的expectedSchema.

假设预期的模式看起来像这样(在 Json 中):{"a": 1,"b": 2, "c": 3} 并且输入行看起来像这样: {"a": 1, "c": 3} Spark 将处理输入而不会失败。

我尝试使用此处描述的解决方案:How do I apply schema with nullable = false to json reading

assert(readJson.schema == expectedSchema)永远不会失败,即使我故意发送带有错误 Json 模式的输入行也是如此。

  1. 有没有办法让我验证给定输入行的实际模式与我预期的模式相匹配?

  2. 有没有办法让我插入一个空值来“填充”“损坏”模式行中缺少的字段?