Pyspark:将列中的json爆炸为多列

sji*_*han 10 python apache-spark apache-spark-sql pyspark

数据看起来像这样 -

+-----------+-----------+-----------------------------+
|         id|      point|                         data|
+-----------------------------------------------------+
|        abc|          6|{"key1":"124", "key2": "345"}|
|        dfl|          7|{"key1":"777", "key2": "888"}|
|        4bd|          6|{"key1":"111", "key2": "788"}|
Run Code Online (Sandbox Code Playgroud)

我试图将其分解为以下格式.

+-----------+-----------+-----------+-----------+
|         id|      point|       key1|       key2|
+------------------------------------------------
|        abc|          6|        124|        345|
|        dfl|          7|        777|        888|
|        4bd|          6|        111|        788|
Run Code Online (Sandbox Code Playgroud)

explode函数将数据框分解为多行.但这不是理想的解决方案.

注意:此解决方案不能回答我的问题. PySpark在列中"爆炸"字典

Ram*_*jan 17

只要您使用Spark 2.1或更高版本,就pyspark.sql.functions.from_json可以获得所需的结果,但您需要先定义所需的结果schema

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField('key1', StringType(), True),
        StructField('key2', StringType(), True)
    ]
)

df.withColumn("data", from_json("data", schema))\
    .select(col('id'), col('point'), col('data.*'))\
    .show()
Run Code Online (Sandbox Code Playgroud)

哪个应该给你

+---+-----+----+----+
| id|point|key1|key2|
+---+-----+----+----+
|abc|    6| 124| 345|
|df1|    7| 777| 888|
|4bd|    6| 111| 788|
+---+-----+----+----+
Run Code Online (Sandbox Code Playgroud)

  • 您应该能够使用以下内容从数据字段中提取 JSON 的架构... `schema = Spark.read.json(df.rdd.map(lambda row: row.data)).schema` (2认同)

jxc*_*jxc 8

正如@pault 所建议的,数据字段是一个string字段。由于行上的 JSON 字符串中的键是相同的(即“key1”、“key2”),因此您也可以使用(根据文档,json_tuple()此函数是版本1.6中的新增功能)

from pyspark.sql import functions as F

df.select('id', 'point', F.json_tuple('data', 'key1', 'key2').alias('key1', 'key2')).show()
Run Code Online (Sandbox Code Playgroud)

下面是我原来的帖子:如果原始表来自,因此该字段不是 python 数据结构,这很可能是错误的。df.show(truncate=False)data

由于您已将数据分解为行,因此我认为该列data是 Python 数据结构而不是字符串:

from pyspark.sql import functions as F

df.select('id', 'point', F.col('data').getItem('key1').alias('key1'), F.col('data')['key2'].alias('key2')).show()
Run Code Online (Sandbox Code Playgroud)