使用 Spark 按行和列扩展 JSON 字符串

Ron*_*som 6 json apache-spark-sql pyspark

我是 Spark 新手,正在使用 JSON,但在做一些相当简单的事情时遇到了困难(我认为)。我尝试过使用类似问题的部分解决方案,但不太正确。我目前有一个 Spark 数据框,其中有几列代表变量。每行都是变量值的唯一组合。然后,我有一个应用于每一行的 UDF,它将每一列作为输入,进行一些分析,并以 JSON 字符串的形式输出每行的汇总表,并将这些结果保存在表的新列中。一些小样本数据如下所示:

+------+-----+------+-------------------------------------------------------------------
|Var 1 |Var 2|Var 3 |JSON Table 
+------+------------+-------------------------------------------------------------------
|True  |10%  |200   |[{"Out_1": "Mean", "Out_2": "25"}, {"Out_1": "Median", "Out_2": "21"}]
|False |15%  |150   |[{"Out_1": "Mean", "Out_2": "19"}, {"Out_1": "Median", "Out_2": "18"}]
|True  |12%  |100   |[{"Out_1": "Mean", "Out_2": "22"}, {"Out_1": "Median", "Out_2": "20"}]
Run Code Online (Sandbox Code Playgroud)

我想将其转换为以下格式:

+------+-----+------+------+-----+
|Var 1 |Var 2|Var 3 |Out_1 |Out_2| 
+------+------------+------+-----+
|True  |10%  |200   |Mean  |25   |
|True  |10%  |200   |Median|21   |
|False |15%  |150   |Mean  |19   |
|False |15%  |150   |Median|18   |
|True  |12%  |100   |Mean  |22   |
|True  |12%  |100   |Median|20   |

Run Code Online (Sandbox Code Playgroud)

实际上,有更多的变量、数百万行和更大的 JSON 字符串以及更多的输出,但核心问题仍然相同。我基本上尝试获取 JSON 模式并使用 from_json ,如下所示:

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = spark.read.json(df.rdd.map(lambda row: row["JSON Table"])).schema

df = df\
     .withColumn("JSON Table", from_json("JSON Table", schema))\
     .select(col('*'), col('JSON Table.*'))\

df.show()
Run Code Online (Sandbox Code Playgroud)

这似乎正确地获取了 JSON 结构(尽管每个值都被读取为字符串,但大多数都是整数),但生成的数据帧是空的,尽管具有正确的列标题。关于如何处理这个问题有什么建议吗?

mur*_*ash 4

假设您的JSON table列是json string. 您可以显式设置您的schemaexplode(from_json)然后设置select您的列。

df.show() #sample dataframe
+-----+-----+-----+----------------------------------------------------------------------+
|Var 1|Var 2|Var 3|JSON Table                                                            |
+-----+-----+-----+----------------------------------------------------------------------+
|true |10%  |200  |[{"Out_1": "Mean", "Out_2": "25"}, {"Out_1": "Median", "Out_2": "21"}]|
|false|15%  |150  |[{"Out_1": "Mean", "Out_2": "19"}, {"Out_1": "Median", "Out_2": "18"}]|
|true |12%  |100  |[{"Out_1": "Mean", "Out_2": "22"}, {"Out_1": "Median", "Out_2": "20"}]|
+-----+-----+-----+----------------------------------------------------------------------+

#sample schema     
#root
 #|-- Var 1: boolean (nullable = true)
 #|-- Var 2: string (nullable = true)
 #|-- Var 3: long (nullable = true)
 #|-- JSON Table: string (nullable = true)


from pyspark.sql import functions as F
from pyspark.sql.types import *

schema = ArrayType(MapType(StringType(),StringType()))

df.withColumn("JSON Table", F.explode(F.from_json("JSON Table", schema)))\
  .select("Var 1","Var 2","Var 3","JSON Table.Out_1","JSON Table.Out_2").show()

+-----+-----+-----+------+-----+
|Var 1|Var 2|Var 3| Out_1|Out_2|
+-----+-----+-----+------+-----+
| true|  10%|  200|  Mean|   25|
| true|  10%|  200|Median|   21|
|false|  15%|  150|  Mean|   19|
|false|  15%|  150|Median|   18|
| true|  12%|  100|  Mean|   22|
| true|  12%|  100|Median|   20|
+-----+-----+-----+------+-----+
Run Code Online (Sandbox Code Playgroud)