Cri*_*ber 2 python apache-spark-sql pyspark
我有一个带有结构数组的 PySpark DataFrame,其中包含两列(colorcode和name)。我想向结构添加一个新列newcol。
这个问题回答了“如何将列添加到嵌套结构”,但我无法将其转移到我的情况,其中结构进一步嵌套在数组内。我似乎无法引用/重新创建数组结构模式。
我的架构:
|-- Id: string (nullable = true)
|-- values: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Dep: long (nullable = true)
| | |-- ABC: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
什么应该变成:
|-- Id: string (nullable = true)
|-- values: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Dep: long (nullable = true)
| | |-- ABC: string (nullable = true)
| | |-- newcol: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
如何将解决方案转移到我的嵌套结构中?
用于获取上述模式的 df 的可重现代码:
data = [
(10, [{"Dep": 10, "ABC": 1}, {"Dep": 10, "ABC": 1}]),
(20, [{"Dep": 20, "ABC": 1}, {"Dep": 20, "ABC": 1}]),
(30, [{"Dep": 30, "ABC": 1}, {"Dep": 30, "ABC": 1}]),
(40, [{"Dep": 40, "ABC": 1}, {"Dep": 40, "ABC": 1}])
]
myschema = StructType(
[
StructField("id", IntegerType(), True),
StructField("values",
ArrayType(
StructType([
StructField("Dep", StringType(), True),
StructField("ABC", StringType(), True)
])
))
]
)
df = spark.createDataFrame(data=data, schema=myschema)
df.printSchema()
df.show(10, False)
Run Code Online (Sandbox Code Playgroud)
对于spark版本>=3.1,您可以使用transform函数和withField方法来实现这一点。
transform根据为array(values此处为列)中的每个元素(此处为 struct(Dep, ABC))提供的函数执行转换计算。withField按名称添加/替换 StructType 中的字段。
df = df.withColumn('values', F.transform('values', lambda x: x.withField('newcol', F.lit(1))))
Run Code Online (Sandbox Code Playgroud)