MrC*_*ogy 12 apache-spark apache-spark-sql pyspark
我有一个类似于架构的数据框
root
|-- state: struct (nullable = true)
| |-- fld: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我想在state结构中添加列,即创建一个具有类似架构的数据帧
root
|-- state: struct (nullable = true)
| |-- fld: integer (nullable = true)
| |-- a: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
但相反,我得到了
root
|-- state: struct (nullable = true)
| |-- fld: integer (nullable = true)
|-- state.a: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
这是尝试
df.withColumn('state.a', val)
Run Code Online (Sandbox Code Playgroud)
mal*_*the 22
使用如下所示的转换:
import pyspark.sql.functions as f
df = df.withColumn(
"state",
f.struct(
f.col("state.*"),
f.lit(123).alias("a")
)
)
Run Code Online (Sandbox Code Playgroud)
这是一种不使用以下方法的方法udf:
# create example dataframe
import pyspark.sql.functions as f
data = [
({'fld': 0},)
]
schema = StructType(
[
StructField('state',
StructType(
[StructField('fld', IntegerType())]
)
)
]
)
df = sqlCtx.createDataFrame(data, schema)
df.printSchema()
#root
# |-- state: struct (nullable = true)
# | |-- fld: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
现在,使用withColumn()和添加使用新的领域lit()和alias().
val = 1
df_new = df.withColumn(
'state',
f.struct(*[f.col('state')['fld'].alias('fld'), f.lit(val).alias('a')])
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# | |-- fld: integer (nullable = true)
# | |-- a: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)
如果嵌套结构中有很多字段,则可以使用列表推导,使用df.schema["state"].dataType.names获取字段名称.例如:
val = 1
s_fields = df.schema["state"].dataType.names # ['fld']
df_new = df.withColumn(
'state',
f.struct(*([f.col('state')[c].alias(c) for c in s_fields] + [f.lit(val).alias('a')]))
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# | |-- fld: integer (nullable = true)
# | |-- a: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)
参考
虽然这是一个太晚的答案,但对于 pyspark 版本 2.xx,支持以下内容。
假设dfOld已经包含state并按fld问题提出。
dfOld.withColumn("a","value")
dfNew = dfOld.select("level1Field1", "level1Field2", struct(col("state.fld").alias("fld"), col("a")).alias("state"))
参考:https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803
| 归档时间: |
|
| 查看次数: |
8706 次 |
| 最近记录: |