ari*_*405 2 python json dataframe apache-spark pyspark
我在 PySpark 中有一个包含 3 列的数据框 - json、date 和 object_id:
-----------------------------------------------------------------------------------------
|json |date |object_id|
-----------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-01|xyz123 |
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-02|xyz123 |
|{'g':{'h':0,'j':{'50':0.005,'80':0,'100':0},'d':0.02}} |2020-08-03|xyz123 |
-----------------------------------------------------------------------------------------
Run Code Online (Sandbox Code Playgroud)
现在我有一个变量列表:[ac60, an60, ad, gh]。我只需要从上述数据帧的 json 列中提取这些变量,并将这些变量添加为数据帧中具有各自值的列。
所以最后,数据框应该是这样的:
-------------------------------------------------------------------------------------------------------
|json |date |object_id|a.c.60|a.n.60|a.d |g.h|
-------------------------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-01|xyz123 |0 |null |0.01|null|
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-02|xyz123 |null |0 |0.01|null|
|{'g':{'h':0,'j':{'k':0.005,'':0,'100':0},'d':0.01}} |2020-08-03|xyz123 |null |null |0.02|0 |
-------------------------------------------------------------------------------------------------------
Run Code Online (Sandbox Code Playgroud)
请帮助获取此结果数据框。我面临的主要问题是由于传入的 json 数据没有固定的结构。json 数据可以是嵌套形式的任何内容,但我只需要提取给定的四个变量。我在 Pandas 中通过展平 json 字符串然后提取 4 个变量来实现这一点,但在 Spark 中它变得越来越困难。
有两种方法可以做到:
get_json_object函数,像这样:import pyspark.sql.functions as F
df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
StringType())
df3 = df.select(F.get_json_object(F.col("value"), "$.a.c.60").alias("a_c_60"),
F.get_json_object(F.col("value"), "$.a.n.60").alias("a_n_60"),
F.get_json_object(F.col("value"), "$.a.d").alias("a_d"),
F.get_json_object(F.col("value"), "$.g.h").alias("g_h"))
Run Code Online (Sandbox Code Playgroud)
会给:
>>> df3.show()
+------+------+----+----+
|a_c_60|a_n_60| a_d| g_h|
+------+------+----+----+
| 0| null|0.01|null|
| null| 0|0.01|null|
| null| null|null| 0|
+------+------+----+----+
Run Code Online (Sandbox Code Playgroud)
from_json函数将 JSON 转换为结构,然后从结构中提取单个值 - 这可能比 JSON 路径更高效:from pyspark.sql.types import *
import pyspark.sql.functions as F
aSchema = StructType([
StructField("c", StructType([
StructField("60", DoubleType(), True)
]), True),
StructField("n", StructType([
StructField("60", DoubleType(), True)
]), True),
StructField("d", DoubleType(), True),
])
gSchema = StructType([
StructField("h", DoubleType(), True)
])
schema = StructType([
StructField("a", aSchema, True),
StructField("g", gSchema, True)
])
df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
StringType())
df2 = df.select(F.from_json("value", schema=schema).alias('data')).select('data.*')
df2.select(df2.a.c['60'], df2.a.n['60'], df2.a.d, df2.g.h).show()
Run Code Online (Sandbox Code Playgroud)
会给
+------+------+----+----+
|a.c.60|a.n.60| a.d| g.h|
+------+------+----+----+
| 0.0| null|0.01|null|
| null| 0.0|0.01|null|
| null| null|null| 0.0|
+------+------+----+----+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1930 次 |
| 最近记录: |