Arè*_*rès 1 python json apache-spark apache-spark-sql pyspark
我有这个数据框:
cSchema = StructType([StructField("id1", StringType()), StructField("id2", StringType()), StructField("params", StringType())\
,StructField("Col2", IntegerType())])
test_list = [[1, 2, '{"param1": "val1", "param2": "val2"}', 1], [1, 3, '{"param1": "val4", "param2": "val5"}', 3]]
df = spark.createDataFrame(test_list,schema=cSchema)
+---+---+--------------------+----+
|id1|id2| params|Col2|
+---+---+--------------------+----+
| 1| 2|{"param1": "val1"...| 1|
| 1| 3|{"param1": "val4"...| 3|
+---+---+--------------------+----+
Run Code Online (Sandbox Code Playgroud)
我想将参数分解为列:
+---+---+----+------+------+
|id1|id2|Col2|param1|param2|
+---+---+----+------+------+
| 1| 2| 1| val1| val2|
| 1| 3| 3| val4| val5|
+---+---+----+------+------+
Run Code Online (Sandbox Code Playgroud)
所以我编码如下:
schema2 = StructType([StructField("param1", StringType()), StructField("param2", StringType())])
df.withColumn(
"params", from_json("params", schema2)
).select(
col('id1'), col('id2'),col('Col2'), col('params.*')
).show()
Run Code Online (Sandbox Code Playgroud)
问题是params模式是动态的(变量schema2),他可能会从一个执行更改为另一个执行,所以我需要动态推断模式(可以让所有列都具有字符串类型)...而且我无法弄清楚一种方法来做到这一点..
有人可以帮我吗?
在 Pyspark 中,语法应该是:
import pyspark.sql.functions as F
schema = F.schema_of_json(df.select('params').head()[0])
df2 = df.withColumn(
"params", F.from_json("params", schema)
).select(
'id1', 'id2', 'Col2', 'params.*'
)
df2.show()
+---+---+----+------+------+
|id1|id2|Col2|param1|param2|
+---+---+----+------+------+
| 1| 2| 1| val1| val2|
| 1| 3| 3| val4| val5|
+---+---+----+------+------+
Run Code Online (Sandbox Code Playgroud)