展平嵌套的Spark Dataframe

Joh*_*ohn 7 apache-spark pyspark spark-dataframe

有没有办法压缩任意嵌套的Spark Dataframe?我所看到的大多数工作都是针对特定的模式编写的,我希望能够通过不同的嵌套类型(例如StructType,ArrayType,MapType等)来泛化一个Dataframe.

假设我有一个类似的架构:

StructType(List(StructField(field1,...), StructField(field2,...), ArrayType(StructType(List(StructField(nested_field1,...), StructField(nested_field2,...)),nested_array,...)))
Run Code Online (Sandbox Code Playgroud)

希望将其调整为具有如下结构的平台:

field1
field2
nested_array.nested_field1
nested_array.nested_field2
Run Code Online (Sandbox Code Playgroud)

仅供参考,寻找Pyspark的建议,但其他风味的Spark也值得赞赏.

MaF*_*aFF 14

这个问题可能有点旧,但对于那些仍在寻找解决方案的人来说,你可以使用select*来内联复杂的数据类型:

首先让我们创建嵌套的数据帧:

from pyspark.sql import HiveContext
hc = HiveContext(sc)
nested_df = hc.read.json(sc.parallelize(["""
{
  "field1": 1, 
  "field2": 2, 
  "nested_array":{
     "nested_field1": 3,
     "nested_field2": 4
  }
}
"""]))
Run Code Online (Sandbox Code Playgroud)

现在要压扁它:

flat_df = nested_df.select("field1", "field2", "nested_array.*")
Run Code Online (Sandbox Code Playgroud)

你会在这里找到有用的例子https://docs.databricks.com/spark/latest/spark-sql/complex-types.html

如果嵌套数组太多,可以使用:

flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(*flat_cols, *[c + ".*" for c in nested_cols])
Run Code Online (Sandbox Code Playgroud)


Nar*_*B M 5

这会展平具有结构类型 数组类型的嵌套 df。通过 Json 读取数据时通常会有所帮助。对此进行了改进/sf/answers/3957342161/

from pyspark.sql.types import *
from pyspark.sql import functions as f

def flatten_structs(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        
        parents, df = stack.pop()
        
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
        
        flat_cols = [
            f.col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]
        
        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))
        
    return nested_df.select(columns)

def flatten_array_struct_df(df):
    
    array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    
    while len(array_cols) > 0:
        
        for array_col in array_cols:
            
            cols_to_select = [x for x in df.columns if x != array_col ]
            
            df = df.withColumn(array_col, f.explode(f.col(array_col)))
            
        df = flatten_structs(df)
        
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    return df

flat_df = flatten_array_struct_df(df)
Run Code Online (Sandbox Code Playgroud)

**


Joh*_*ohn 2

这是我的最终方法:

1)将数据帧中的行映射到字典的rdd。在线查找合适的 python 代码来扁平化 dict。

flat_rdd = nested_df.map(lambda x : flatten(x))
Run Code Online (Sandbox Code Playgroud)

在哪里

def flatten(x):
  x_dict = x.asDict()
  ...some flattening code...
  return x_dict
Run Code Online (Sandbox Code Playgroud)

2)将RDD[dict]转换回数据帧

flat_df = sqlContext.createDataFrame(flat_rdd)
Run Code Online (Sandbox Code Playgroud)