如何在 PySpark 中为嵌套 JSON 列创建架构?

nag*_*ish 3 schema json apache-spark pyspark pyspark-schema

我有一个包含多个列的镶木地板文件,其中有 2 列是 JSON/Struct,但它们的类型是字符串。可以存在任意数量的 array_elements。

\n
{\n  "addressline": [\n\n    {\n      "array_element": "F748DK\xe2\x80\x998U1P9\xe2\x80\x992ZLKXE"\n    },\n    {\n      "array_element": "\xe2\x80\x99O\xe2\x80\x99P0BQ04M-"\n    },\n    {\n      "array_element": "\xe2\x80\x99fvrvrWEM-"\n    }\n\n  ],\n  "telephone": [\n    {\n      "array_element": {\n        "locationtype": "8.PLT",\n        "countrycode": null,\n        "phonenumber": "000000000",\n        "phonetechtype": "1.PTT",\n        "countryaccesscode": null,\n        "phoneremark": null\n      }\n    }\n  ]\n}\n
Run Code Online (Sandbox Code Playgroud)\n

如何创建一个架构来处理 PySpark 中的这些列?

\n

Zyg*_*ygD 5

将您提供的示例视为字符串,我创建了此数据框:

\n
from pyspark.sql import functions as F, types as T\ndf = spark.createDataFrame([(\'{"addressline":[{"array_element":"F748DK\xe2\x80\x998U1P9\xe2\x80\x992ZLKXE"},{"array_element":"\xe2\x80\x99O\xe2\x80\x99P0BQ04M-"},{"array_element":"\xe2\x80\x99fvrvrWEM-"}],"telephone":[{"array_element":{"locationtype":"8.PLT","countrycode":null,"phonenumber":"000000000","phonetechtype":"1.PTT","countryaccesscode":null,"phoneremark":null}}]}\',)], [\'c1\'])\n
Run Code Online (Sandbox Code Playgroud)\n

这是应用于此列的架构:

\n
schema = T.StructType([\n    T.StructField(\'addressline\', T.ArrayType(T.StructType([\n        T.StructField(\'array_element\', T.StringType())\n    ]))),\n    T.StructField(\'telephone\', T.ArrayType(T.StructType([\n        T.StructField(\'array_element\', T.StructType([\n            T.StructField(\'locationtype\', T.StringType()),\n            T.StructField(\'countrycode\', T.StringType()),\n            T.StructField(\'phonenumber\', T.StringType()),\n            T.StructField(\'phonetechtype\', T.StringType()),\n            T.StructField(\'countryaccesscode\', T.StringType()),\n            T.StructField(\'phoneremark\', T.StringType()),\n        ]))\n    ])))\n])\n
Run Code Online (Sandbox Code Playgroud)\n

为函数提供架构的结果from_json

\n
df = df.withColumn(\'c1\', F.from_json(\'c1\', schema))\n\ndf.show()\n# +-------------------------------------------------------------------------------------------------------+\n# |c1                                                                                                     |\n# +-------------------------------------------------------------------------------------------------------+\n# |{[{F748DK\xe2\x80\x998U1P9\xe2\x80\x992ZLKXE}, {\xe2\x80\x99O\xe2\x80\x99P0BQ04M-}, {\xe2\x80\x99fvrvrWEM-}], [{{8.PLT, null, 000000000, 1.PTT, null, null}}]}|\n# +-------------------------------------------------------------------------------------------------------+\n\ndf.printSchema()\n# root\n#  |-- c1: struct (nullable = true)\n#  |    |-- addressline: array (nullable = true)\n#  |    |    |-- element: struct (containsNull = true)\n#  |    |    |    |-- array_element: string (nullable = true)\n#  |    |-- telephone: array (nullable = true)\n#  |    |    |-- element: struct (containsNull = true)\n#  |    |    |    |-- array_element: struct (nullable = true)\n#  |    |    |    |    |-- locationtype: string (nullable = true)\n#  |    |    |    |    |-- countrycode: string (nullable = true)\n#  |    |    |    |    |-- phonenumber: string (nullable = true)\n#  |    |    |    |    |-- phonetechtype: string (nullable = true)\n#  |    |    |    |    |-- countryaccesscode: string (nullable = true)\n#  |    |    |    |    |-- phoneremark: string (nullable = true)\n
Run Code Online (Sandbox Code Playgroud)\n