gam*_*e25 2 apache-spark pyspark databricks delta-lake
我遇到的情况是我的数据如下所示:
ID | 价值观 | 图式 |
---|---|---|
2 | {'colA':3.2,'colB':'val2','colC':3.4} | {'colA':'FLOAT', 'colB':'STRING', 'colC':'FLOAT'} |
3 | {'colC':3.2,'colX':3.9} | {'colC':'FLOAT', 'colX':'FLOAT'} |
4 | {'colG':'val1','colH':93.2} | {'colG':'STRING', 'colH':'FLOAT'} |
5 | {'colG':'val4', 'colA':4.2, 'colJ':93.2, 'colM':'val4'} | {'colG':'STRING', 'colA':'FLOAT', 'ColJ':'FLOAT', 'ColM':'STRING'} |
和列最初values
都schema
存储为StringType
. 我想将values
列转换为StructType
定义每个可能的键的列。最终架构应如下所示:
|-- id: integer (nullable = false)
|-- values: struct (nullable = true)
| |-- colA: double (nullable = true)
| |-- colB: string (nullable = true)
| |-- colC: double (nullable = true)
| |-- colG: string (nullable = true)
| |-- colH: double (nullable = true)
| |-- colJ: double (nullable = true)
| |-- colM: string (nullable = true)
| |-- colX: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我的问题是,是否可以在不明确指定的情况下获取此模式?在我展示的示例中,我们正在讨论几列,但在实际情况中,它是几百列。理想情况下,我想在没有原始schema
列的情况下推断模式,但如果需要使用此列,这不是一个大问题(请注意,模式列中的数据类型不一定与 Spark 数据类型匹配。数据需要存储在三角洲。
这可能吗,或者除了将其存储为 MapType(StringType, StringType) 之外没有其他选择吗?
我用于创建测试数据框的代码
data = [
(
2,
"{'colA':3.2, 'colB':'val2', 'colC':3.4}",
"{'colA':'FLOAT', 'colB':'STRING', 'colC':'FLOAT}",
),
(
3,
"{'colC':3.2, 'colX':3.9}",
"{'colC':'FLOAT', 'colX':'FLOAT'}"),
(
4,
"{'colG':'val1', 'colH':93.2}",
"{'colG':'STRING', 'colH':'FLOAT'}"),
(
5,
"{'colG':'val4', 'colA':4.2, 'colJ':93.2, 'colM':'val4'}",
"{'colG':'STRING', 'colA':'FLOAT', 'ColJ':'FLOAT', 'ColM':'STRING'}",
),
]
schema = T.StructType(
[
T.StructField("id", T.IntegerType()),
T.StructField("values", T.StringType()),
T.StructField("schema", T.StringType()),
]
)
df = spark.createDataFrame(data, schema)
Run Code Online (Sandbox Code Playgroud)
您可以为目标列创建一个架构字符串并用于from_json
解析values
字段。
例子
# create target schema
jsonsch = data_sdf. \
groupBy(func.lit(1).alias('dropme')). \
agg(func.array_join(func.collect_list('schema'), ',').alias('allsch')). \
withColumn('allsch', func.regexp_replace('allsch', '\},\{', ', ')). \
select('allsch'). \
collect()[0][0]
# "{'colA':'FLOAT', 'colB':'STRING', 'colC':'FLOAT', 'colC':'FLOAT', 'colX':'FLOAT', 'colG':'STRING', 'colH':'FLOAT', 'colG':'STRING', 'colA':'FLOAT', 'colJ':'FLOAT', 'colM':'STRING'}"
import json
jsonschema = ', '.join([k[0]+' '+k[1].lower() for k in json.loads(jsonsch.replace("'", '"')).items()])
# "colA float, colB string, colC float, colX float, colG string, colH float, colJ float, colM string"
# parse the `values` column using the target schema
data_sdf. \
withColumn('parsed_val', func.from_json('values', jsonschema)). \
selectExpr('id', 'parsed_val'). \
show(truncate=False)
# +---+------------------------------------------------+
# |id |parsed_val |
# +---+------------------------------------------------+
# |2 |{3.2, val2, 3.4, null, null, null, null, null} |
# |3 |{null, null, 3.2, null, null, null, null, 3.9} |
# |4 |{null, null, null, val1, 93.2, null, null, null}|
# |5 |{4.2, null, null, val4, null, 93.2, val4, null} |
# +---+------------------------------------------------+
# root
# |-- id: integer (nullable = true)
# |-- parsed_val: struct (nullable = true)
# | |-- colA: float (nullable = true)
# | |-- colB: string (nullable = true)
# | |-- colC: float (nullable = true)
# | |-- colG: string (nullable = true)
# | |-- colH: float (nullable = true)
# | |-- colJ: float (nullable = true)
# | |-- colM: string (nullable = true)
# | |-- colX: float (nullable = true)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
383 次 |
最近记录: |