从 json 字符串推断架构

Arè*_*rès 1 python json apache-spark apache-spark-sql pyspark

我有这个数据框:

cSchema = StructType([StructField("id1", StringType()), StructField("id2", StringType()), StructField("params", StringType())\
                      ,StructField("Col2", IntegerType())])

test_list = [[1, 2, '{"param1": "val1", "param2": "val2"}', 1], [1, 3, '{"param1": "val4", "param2": "val5"}', 3]]

df = spark.createDataFrame(test_list,schema=cSchema) 

+---+---+--------------------+----+
|id1|id2|              params|Col2|
+---+---+--------------------+----+
|  1|  2|{"param1": "val1"...|   1|
|  1|  3|{"param1": "val4"...|   3|
+---+---+--------------------+----+
Run Code Online (Sandbox Code Playgroud)

我想将参数分解为列:

+---+---+----+------+------+
|id1|id2|Col2|param1|param2|
+---+---+----+------+------+
|  1|  2|   1|  val1|  val2|
|  1|  3|   3|  val4|  val5|
+---+---+----+------+------+
Run Code Online (Sandbox Code Playgroud)

所以我编码如下:

schema2 = StructType([StructField("param1", StringType()), StructField("param2", StringType())])

df.withColumn(
  "params", from_json("params", schema2)
).select(
  col('id1'), col('id2'),col('Col2'), col('params.*')
).show()
Run Code Online (Sandbox Code Playgroud)

问题是params模式是动态的(变量schema2),他可能会从一个执行更改为另一个执行,所以我需要动态推断模式(可以让所有列都具有字符串类型)...而且我无法弄清楚一种方法来做到这一点..

有人可以帮我吗?

mck*_*mck 7

在 Pyspark 中,语法应该是:

import pyspark.sql.functions as F
schema = F.schema_of_json(df.select('params').head()[0])

df2 = df.withColumn(
  "params", F.from_json("params", schema)
).select(
  'id1', 'id2', 'Col2', 'params.*'
)

df2.show()
+---+---+----+------+------+
|id1|id2|Col2|param1|param2|
+---+---+----+------+------+
|  1|  2|   1|  val1|  val2|
|  1|  3|   3|  val4|  val5|
+---+---+----+------+------+
Run Code Online (Sandbox Code Playgroud)