Pyspark:从 JSON 文件创建模式

Mor*_*tz 5 jsonschema apache-spark-sql pyspark

我正在处理来自很长的嵌套 JSON 文件的数据。问题是,这些文件的结构并不总是相同,因为其中一些文件缺少其他文件的列。我想从包含所有列的空 JSON 文件创建自定义架构。如果我稍后将 JSON 文件读入这个预定义的架构中,则不存在的列将填充空值(至少是计划)。到目前为止我做了什么:

  1. 将测试 JSON(不包含所有预期的列)加载到数据框中
  2. 将其架构写入 JSON 文件
  3. 在文本编辑器中打开此 JSON 文件并手动添加缺少的列

接下来我想做的是通过将 JSON 文件读入我的代码来创建一个新模式,但我对合成很困难。我可以直接从文件本身读取架构吗?我努力了

schemaFromJson = StructType.fromJson(json.loads('filepath/spark-schema.json'))
Run Code Online (Sandbox Code Playgroud)

但它给了我 TypeError: init () 缺少 2 个必需的位置参数:'doc' 和 'pos'

知道我当前的代码有什么问题吗?多谢

编辑:我遇到了这个链接sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield。第 7 章几乎描述了我遇到的问题。我只是不明白如何解析我手动增强为 schemaFromJson = StructType.fromJson(json.loads(schema.json)) 的 json 文件。

当我做:

jsonDF = spark.read.json(filesToLoad)
schema = jsonDF.schema.json()
schemaNew = StructType.fromJson(json.loads(schema))
jsonDF2 = spark.read.schema(schemaNew).json(filesToLoad)
Run Code Online (Sandbox Code Playgroud)

代码运行完毕,但显然没有用,因为 jsonDF 和 jsonDF2 确实具有相同的内容/架构。我想要实现的目标是向“schema”添加一些列,然后这些列将反映在“schemaNew”中。

Mor*_*tz 13

我想我明白了。Schemapath 包含已经增强的架构:

schemapath = '/path/spark-schema.json'
with open(schemapath) as f:
   d = json.load(f)
   schemaNew = StructType.fromJson(d)
   jsonDf2 = spark.read.schema(schmaNew).json(filesToLoad)
   jsonDF2.printSchema()
Run Code Online (Sandbox Code Playgroud)

  • 你好@Moritz,希望你做得很好。我正在尝试实施这个解决方案。但是,我在“schemaNew = StructType.fromJson(d)”中收到“KeyError: 'fields'”错误。你能帮我一下吗?`Schemapath 包含已经增强的架构`。你到底在这儿经过什么。请帮忙 (2认同)