使用PySpark删除Dataframe的嵌套列

Pie*_*rre 5 dataframe apache-spark pyspark

我正在尝试使用pyspark在Spark数据框中删除一些嵌套的列。我为Scala找到了这个功能,它似乎完全可以满足我的要求,但是我对Scala并不熟悉,也不知道如何用Python编写它。

/sf/answers/2796066871/

我真的很感谢您的帮助。

谢谢,

Spa*_*ner 5

pyspark 示例:

def drop_col(df, struct_nm, delete_struct_child_col_nm):
    fields_to_keep = filter(lambda x:  x != delete_struct_child_col_nm, df.select("{}.*".format(struct_nm)).columns)
    fields_to_keep = list(map(lambda x:  "{}.{}".format(struct_nm, x), fields_to_keep))
    return df.withColumn(struct_nm, struct(fields_to_keep))
Run Code Online (Sandbox Code Playgroud)


dek*_*iya 3

我发现使用 pyspark 的方法是首先将嵌套列转换为 json,然后使用新的嵌套模式解析转换后的 json,并过滤掉不需要的列。

假设我有以下架构,并且我想从数据框中删除d,ej( a.b.d, a.e, ) :a.h.j

root
 |-- a: struct (nullable = true)
 |    |-- b: struct (nullable = true)
 |    |    |-- c: long (nullable = true)
 |    |    |-- d: string (nullable = true)
 |    |-- e: struct (nullable = true)
 |    |    |-- f: long (nullable = true)
 |    |    |-- g: string (nullable = true)
 |    |-- h: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- i: string (nullable = true)
 |    |    |    |-- j: string (nullable = true)
 |-- k: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我使用了以下方法:

  1. a通过排除d,e和来创建新架构j。执行此操作的快速方法是手动选择所需的字段df.select("a").schema,并使用 从所选字段创建新架构StructType。或者,您可以通过遍历架构树并排除不需要的字段以编程方式执行此操作,例如:

    def exclude_nested_field(schema, unwanted_fields, parent=""):
        new_schema = []
    
        for field in schema:
            full_field_name = field.name
            if parent:
                full_field_name = parent + "." + full_field_name
    
            if full_field_name not in unwanted_fields:
                if isinstance(field.dataType, StructType):
                    inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name)
                    new_schema.append(StructField(field.name, inner_schema))
                elif isinstance(field.dataType, ArrayType):
                    new_schema.append(StructField(field.name, ArrayType(field.dataType.elementType)))
                else:
                    new_schema.append(StructField(field.name, field.dataType))
    
        return StructType(new_schema)
    
    new_schema = exclude_nested_field(df.schema["a"].dataType, ["b.d", "e", "h.j"])
    
    Run Code Online (Sandbox Code Playgroud)
  2. 将列转换a为 json:.withColumn("json", F.to_json("a")).drop("a")

  3. a使用步骤 1 中找到的新架构解析步骤 2 中的json 转换列:.withColumn("a", F.from_json("json", new_schema)).drop("json")