PySpark动态创建StructType

gam*_*e25 2 apache-spark pyspark databricks delta-lake

我遇到的情况是我的数据如下所示:

ID 价值观 图式
2 {'colA':3.2,'colB':'val2','colC':3.4} {'colA':'FLOAT', 'colB':'STRING', 'colC':'FLOAT'}
3 {'colC':3.2,'colX':3.9} {'colC':'FLOAT', 'colX':'FLOAT'}
4 {'colG':'val1','colH':93.2} {'colG':'STRING', 'colH':'FLOAT'}
5 {'colG':'val4', 'colA':4.2, 'colJ':93.2, 'colM':'val4'} {'colG':'STRING', 'colA':'FLOAT', 'ColJ':'FLOAT', 'ColM':'STRING'}

和列最初valuesschema存储为StringType. 我想将values列转换为StructType定义每个可能的键的列。最终架构应如下所示:

 |-- id: integer (nullable = false)
 |-- values: struct (nullable = true)
 |    |-- colA: double (nullable = true)
 |    |-- colB: string (nullable = true)
 |    |-- colC: double (nullable = true)
 |    |-- colG: string (nullable = true)
 |    |-- colH: double (nullable = true)
 |    |-- colJ: double (nullable = true)
 |    |-- colM: string (nullable = true)
 |    |-- colX: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我的问题是,是否可以在不明确指定的情况下获取此模式?在我展示的示例中,我们正在讨论几列,但在实际情况中,它是几百列。理想情况下,我想在没有原始schema列的情况下推断模式,但如果需要使用此列,这不是一个大问题(请注意,模式列中的数据类型不一定与 Spark 数据类型匹配。数据需要存储在三角洲。

这可能吗,或者除了将其存储为 MapType(StringType, StringType) 之外没有其他选择吗?

我用于创建测试数据框的代码

data = [
    (
        2,
        "{'colA':3.2, 'colB':'val2', 'colC':3.4}",
        "{'colA':'FLOAT', 'colB':'STRING', 'colC':'FLOAT}",
    ),
    (
        3, 
        "{'colC':3.2, 'colX':3.9}", 
        "{'colC':'FLOAT', 'colX':'FLOAT'}"),
    (
        4, 
        "{'colG':'val1', 'colH':93.2}", 
        "{'colG':'STRING', 'colH':'FLOAT'}"),
    (
        5,
        "{'colG':'val4', 'colA':4.2, 'colJ':93.2, 'colM':'val4'}",
        "{'colG':'STRING', 'colA':'FLOAT', 'ColJ':'FLOAT', 'ColM':'STRING'}",
    ),
]

schema = T.StructType(
    [
        T.StructField("id", T.IntegerType()),
        T.StructField("values", T.StringType()),
        T.StructField("schema", T.StringType()),
    ]
)

df = spark.createDataFrame(data, schema)
Run Code Online (Sandbox Code Playgroud)

sam*_*art 5

您可以为目标列创建一个架构字符串并用于from_json解析values字段。

例子

# create target schema
jsonsch = data_sdf. \
    groupBy(func.lit(1).alias('dropme')). \
    agg(func.array_join(func.collect_list('schema'), ',').alias('allsch')). \
    withColumn('allsch', func.regexp_replace('allsch', '\},\{', ', ')). \
    select('allsch'). \
    collect()[0][0]

# "{'colA':'FLOAT', 'colB':'STRING', 'colC':'FLOAT', 'colC':'FLOAT', 'colX':'FLOAT', 'colG':'STRING', 'colH':'FLOAT', 'colG':'STRING', 'colA':'FLOAT', 'colJ':'FLOAT', 'colM':'STRING'}"

import json

jsonschema = ', '.join([k[0]+' '+k[1].lower() for k in json.loads(jsonsch.replace("'", '"')).items()])

# "colA float, colB string, colC float, colX float, colG string, colH float, colJ float, colM string"

# parse the `values` column using the target schema
data_sdf. \
    withColumn('parsed_val', func.from_json('values', jsonschema)). \
    selectExpr('id', 'parsed_val'). \
    show(truncate=False)

# +---+------------------------------------------------+
# |id |parsed_val                                      |
# +---+------------------------------------------------+
# |2  |{3.2, val2, 3.4, null, null, null, null, null}  |
# |3  |{null, null, 3.2, null, null, null, null, 3.9}  |
# |4  |{null, null, null, val1, 93.2, null, null, null}|
# |5  |{4.2, null, null, val4, null, 93.2, val4, null} |
# +---+------------------------------------------------+

# root
#  |-- id: integer (nullable = true)
#  |-- parsed_val: struct (nullable = true)
#  |    |-- colA: float (nullable = true)
#  |    |-- colB: string (nullable = true)
#  |    |-- colC: float (nullable = true)
#  |    |-- colG: string (nullable = true)
#  |    |-- colH: float (nullable = true)
#  |    |-- colJ: float (nullable = true)
#  |    |-- colM: string (nullable = true)
#  |    |-- colX: float (nullable = true)
Run Code Online (Sandbox Code Playgroud)