在Spark SQL中自动且优雅地展平DataFrame

ech*_*hen 37 scala apache-spark apache-spark-sql

所有,

是否有一种优雅且可接受的方式来使用嵌套的列展平Spark SQL表(Parquet) StructType

例如

如果我的架构是:

foo
 |_bar
 |_baz
x
y
z
Run Code Online (Sandbox Code Playgroud)

如何在不依靠手动运行的情况下将其选择为展平的表格形式

df.select("foo.bar","foo.baz","x","y","z")
Run Code Online (Sandbox Code Playgroud)

换句话说,如何在a StructType和a下以编程方式获得上述代码的结果DataFrame

Dav*_*fin 65

简短的回答是,没有"接受"的方法来做到这一点,但你可以非常优雅地使用递归函数,select(...)通过遍历整个函数来生成语句DataFrame.schema.

递归函数应该返回一个Array[Column].每次函数命中时StructType,它都会调用自身并将返回的函数附加Array[Column]到它自己的函数中Array[Column].

就像是:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)

    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName))
    }
  })
}
Run Code Online (Sandbox Code Playgroud)

然后你会像这样使用它:

df.select(flattenSchema(df.schema):_*)
Run Code Online (Sandbox Code Playgroud)

  • 以防万一其他人偶然发现:如果你想让新的列名反映原始模式的嵌套结构:`````f1.nested1.nested2 ...```你应该在这一行为列添加别名:` ``case _ => Array(col(colName))```应该变成```case _ => Array(col(colName).alias(colName))``` (7认同)
  • 使用此解决方案,如何处理具有相同名称的最低级别的子节点?例如,父元素Foo有子Bar,而父元素Foz也有一个单独的子Bar。当从初始数据帧中选择Foo.Bar和Foz.Bar时(带有由flattenSchema返回的Array),我得到2个列,均名为Bar。但是我希望列标题为`Foo.Bar`或`Foo_Bar`或类似的东西。因此,每个人都将是唯一且毫不含糊的。 (4认同)
  • TheM00s3,您将导入org.apache.spark.sql.functions.col,它也应在Spark 2.1.x中正常工作(到目前为止,仅在Scala中尝试过此操作,而不是Java) (3认同)

V. *_*mma 19

我正在改进我以前的答案,并在接受的答案的评论中提出我自己的问题的解决方案.

此接受的解决方案创建一个Column对象数组,并使用它来选择这些列.在星火,如果你有一个嵌套的数据帧,可以选择这样的子列:df.select("Parent.Child")这会返回一个数据框与孩子列的值,并命名为儿童.但是,如果你有不同的父结构的属性相同的名称,你失去对家长的信息,并最终可能具有相同的列名,不能由名字再访问他们,因为他们是毫不含糊的.

这是我的问题.

我找到了解决问题的方法,也许它可以帮助别人.我flattenSchema单独打电话:

val flattenedSchema = flattenSchema(df.schema)
Run Code Online (Sandbox Code Playgroud)

并返回一个Column对象数组.我select()会将原始列名称作为字符串映射到自己,而不是在最后一个级别的子项中返回一个DataFrame Parent.Child,而是在选择列之后,将其重命名为(我也替换了点)为了我的方便,使用下划线):Parent.ChildChild

val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))
Run Code Online (Sandbox Code Playgroud)

然后你可以使用原始答案中显示的select函数:

var newDf = df.select(renamedCols:_*)
Run Code Online (Sandbox Code Playgroud)


Eva*_*n V 12

只是想分享我对Pyspark的解决方案 - 它或多或少是@David Griffin解决方案的翻译,所以它支持任何级别的嵌套对象.

from pyspark.sql.types import StructType, ArrayType  

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields


df.select(flatten(df.schema)).show()
Run Code Online (Sandbox Code Playgroud)


Pow*_*ers 5

DataFrame#flattenSchema向开源spark-daria项目添加了一个方法。

下面介绍了如何在代码中使用该函数。

import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.flattenSchema().show()

+-------+-------+---------+----+---+
|foo.bar|foo.baz|        x|   y|  z|
+-------+-------+---------+----+---+
|   this|     is|something|cool| ;)|
+-------+-------+---------+----+---+
Run Code Online (Sandbox Code Playgroud)

您还可以使用该方法指定不同的列名分隔符flattenSchema()

df.flattenSchema(delimiter = "_").show()
+-------+-------+---------+----+---+
|foo_bar|foo_baz|        x|   y|  z|
+-------+-------+---------+----+---+
|   this|     is|something|cool| ;)|
+-------+-------+---------+----+---+
Run Code Online (Sandbox Code Playgroud)

这个分隔符参数非常重要。如果您要展平架构以在 Redshift 中加载表,则将无法使用句点作为分隔符。

这是生成此输出的完整代码片段。

val data = Seq(
  Row(Row("this", "is"), "something", "cool", ";)")
)

val schema = StructType(
  Seq(
    StructField(
      "foo",
      StructType(
        Seq(
          StructField("bar", StringType, true),
          StructField("baz", StringType, true)
        )
      ),
      true
    ),
    StructField("x", StringType, true),
    StructField("y", StringType, true),
    StructField("z", StringType, true)
  )
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  StructType(schema)
)

df.flattenSchema().show()
Run Code Online (Sandbox Code Playgroud)

底层代码类似于 David Griffin 的代码(如果您不想将 Spark-daria 依赖项添加到您的项目中)。

object StructTypeHelpers {

  def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = {
    schema.fields.flatMap(structField => {
      val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name
      val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name

      structField.dataType match {
        case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName)
        case _ => Array(col(codeColName).alias(colName))
      }
    })
  }

}

object DataFrameExt {

  implicit class DataFrameMethods(df: DataFrame) {

    def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = {
      df.select(
        StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _*
      )
    }

  }

}
Run Code Online (Sandbox Code Playgroud)

  • 我们可以添加对 Array<Struct> 和 Array. (2认同)