如何在pyspark数据框中的嵌套结构中添加列?

MrC*_*ogy 12 apache-spark apache-spark-sql pyspark

我有一个类似于架构的数据框

root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我想在state结构中添加列,即创建一个具有类似架构的数据帧

root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)
 |    |-- a: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

但相反,我得到了

root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)
 |-- state.a: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

这是尝试

df.withColumn('state.a', val)
Run Code Online (Sandbox Code Playgroud)

mal*_*the 22

使用如下所示的转换:

import pyspark.sql.functions as f

df = df.withColumn(
    "state",
    f.struct(
        f.col("state.*"),
        f.lit(123).alias("a")
    )
)
Run Code Online (Sandbox Code Playgroud)


pau*_*ult 9

这是一种不使用以下方法的方法udf:

# create example dataframe
import pyspark.sql.functions as f
data = [
    ({'fld': 0},)
]

schema = StructType(
    [
        StructField('state',
            StructType(
                [StructField('fld', IntegerType())]
            )
        )
    ]
)

df = sqlCtx.createDataFrame(data, schema)
df.printSchema()
#root
# |-- state: struct (nullable = true)
# |    |-- fld: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

现在,使用withColumn()和添加使用新的领域lit()alias().

val = 1
df_new = df.withColumn(
    'state', 
    f.struct(*[f.col('state')['fld'].alias('fld'), f.lit(val).alias('a')])
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# |    |-- fld: integer (nullable = true)
# |    |-- a: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)

如果嵌套结构中有很多字段,则可以使用列表推导,使用df.schema["state"].dataType.names获取字段名称.例如:

val = 1
s_fields = df.schema["state"].dataType.names # ['fld']
df_new = df.withColumn(
    'state', 
    f.struct(*([f.col('state')[c].alias(c) for c in s_fields] + [f.lit(val).alias('a')]))
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# |    |-- fld: integer (nullable = true)
# |    |-- a: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)

参考

  • 我找到了一种从Struct获取字段名称的方法,而不是从这个答案中手动命名它们.


des*_*itb 5

虽然这是一个太晚的答案,但对于 pyspark 版本 2.xx,支持以下内容。

假设dfOld已经包含state并按fld问题提出。

dfOld.withColumn("a","value") dfNew = dfOld.select("level1Field1", "level1Field2", struct(col("state.fld").alias("fld"), col("a")).alias("state"))

参考:https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803