Pie*_*rre 5 dataframe apache-spark pyspark
我正在尝试使用pyspark在Spark数据框中删除一些嵌套的列。我为Scala找到了这个功能,它似乎完全可以满足我的要求,但是我对Scala并不熟悉,也不知道如何用Python编写它。
我真的很感谢您的帮助。
谢谢,
我们现在可以使用 Spark 版本 >= 3.1 本地完成此操作
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.dropFields.html
pyspark 示例:
def drop_col(df, struct_nm, delete_struct_child_col_nm):
fields_to_keep = filter(lambda x: x != delete_struct_child_col_nm, df.select("{}.*".format(struct_nm)).columns)
fields_to_keep = list(map(lambda x: "{}.{}".format(struct_nm, x), fields_to_keep))
return df.withColumn(struct_nm, struct(fields_to_keep))
Run Code Online (Sandbox Code Playgroud)
我发现使用 pyspark 的方法是首先将嵌套列转换为 json,然后使用新的嵌套模式解析转换后的 json,并过滤掉不需要的列。
假设我有以下架构,并且我想从数据框中删除d,e和j( a.b.d, a.e, ) :a.h.j
root
|-- a: struct (nullable = true)
| |-- b: struct (nullable = true)
| | |-- c: long (nullable = true)
| | |-- d: string (nullable = true)
| |-- e: struct (nullable = true)
| | |-- f: long (nullable = true)
| | |-- g: string (nullable = true)
| |-- h: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- i: string (nullable = true)
| | | |-- j: string (nullable = true)
|-- k: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我使用了以下方法:
a通过排除d,e和来创建新架构j。执行此操作的快速方法是手动选择所需的字段df.select("a").schema,并使用 从所选字段创建新架构StructType。或者,您可以通过遍历架构树并排除不需要的字段以编程方式执行此操作,例如:
def exclude_nested_field(schema, unwanted_fields, parent=""):
new_schema = []
for field in schema:
full_field_name = field.name
if parent:
full_field_name = parent + "." + full_field_name
if full_field_name not in unwanted_fields:
if isinstance(field.dataType, StructType):
inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name)
new_schema.append(StructField(field.name, inner_schema))
elif isinstance(field.dataType, ArrayType):
new_schema.append(StructField(field.name, ArrayType(field.dataType.elementType)))
else:
new_schema.append(StructField(field.name, field.dataType))
return StructType(new_schema)
new_schema = exclude_nested_field(df.schema["a"].dataType, ["b.d", "e", "h.j"])
Run Code Online (Sandbox Code Playgroud)
将列转换a为 json:.withColumn("json", F.to_json("a")).drop("a")
a使用步骤 1 中找到的新架构解析步骤 2 中的json 转换列:.withColumn("a", F.from_json("json", new_schema)).drop("json")