按列“ grp”分组并压缩DataFrame-(按列“ ord”排序的每个列的最后一个非空值)

Max*_*axU 5 scala aggregate-functions aggregation apache-spark

假设我有以下DataFrame:

+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null|  3|null|  11|
|  2|    null|  2| xxx|  22|
|  1|    null|  1| yyy|null|
|  2|    null|  7|null|  33|
|  1|    null| 12|null|null|
|  2|    null| 19|null|  77|
|  1|    null| 10| s13|null|
|  2|    null| 11| a23|null|
+---+--------+---+----+----+
Run Code Online (Sandbox Code Playgroud)

这是带有注释的相同样本DF,按grp和排序ord

scala> df.orderBy("grp", "ord").show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null|  1| yyy|null|
|  1|    null|  3|null|  11|   # grp:1 - last value for `col2` (11)
|  1|    null| 10| s13|null|   # grp:1 - last value for `col1` (s13)
|  1|    null| 12|null|null|   # grp:1 - last values for `null_col`, `ord`
|  2|    null|  2| xxx|  22|   
|  2|    null|  7|null|  33|   
|  2|    null| 11| a23|null|   # grp:2 - last value for `col1` (a23)
|  2|    null| 19|null|  77|   # grp:2 - last values for `null_col`, `ord`, `col2`
+---+--------+---+----+----+
Run Code Online (Sandbox Code Playgroud)

我想压缩它。即按列对它"grp"进行分组,然后对每组按列对行进行排序,"ord"并取not null每一列中的最后一个值(如果有的话)。

+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null| 12| s13|  11|
|  2|    null| 19| a23|  77|
+---+--------+---+----+----+
Run Code Online (Sandbox Code Playgroud)

我看过以下类似问题:

但是我真正的DataFrame有超过250列,所以我需要一个无需显式指定所有列的解决方案。

我不能把头缠住...


MCVE:如何创建示例数据框:

  1. 创建本地文件“ /tmp/data.txt”,然后将其粘贴并粘贴到DataFrame的上下文中(如上面所述)
  2. 定义功能readSparkOutput()
  3. 将“ /tmp/data.txt”解析为DataFrame:

    val df = readSparkOutput("file:///tmp/data.txt")
    
    Run Code Online (Sandbox Code Playgroud)

更新: 我认为它应该类似于以下SQL:

SELECT
  grp, ord, null_col, col1, col2
FROM (
    SELECT
      grp,
      ord,
      FIRST(null_col) OVER (PARTITION BY grp ORDER BY ord DESC) as null_col,
      FIRST(col1) OVER (PARTITION BY grp ORDER BY ord DESC) as col1,
      FIRST(col2) OVER (PARTITION BY grp ORDER BY ord DESC) as col2,
      ROW_NUMBER() OVER (PARTITION BY grp ORDER BY ord DESC) as rn
    FROM table_name) as v
WHERE v.rn = 1;
Run Code Online (Sandbox Code Playgroud)

我们如何动态生成这样的Spark查询?

我尝试了以下简化方法:

import org.apache.spark.sql.expressions.Window

val win = Window
  .partitionBy("grp")
  .orderBy($"ord".desc)

val cols = df.columns.map(c => first(c, ignoreNulls=true).over(win).as(c))
Run Code Online (Sandbox Code Playgroud)

产生:

scala> cols
res23: Array[org.apache.spark.sql.Column] = Array(first(grp, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `grp`, first(null_col, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `null_col`, first(ord, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `ord`, first(col1, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col1`, first(col2, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col2`)
Run Code Online (Sandbox Code Playgroud)

但我无法将其传递给df.select

scala> df.select(cols.head, cols.tail: _*).show
<console>:34: error: no `: _*' annotation allowed here
(such annotations are only allowed in arguments to *-parameters)
       df.select(cols.head, cols.tail: _*).show
Run Code Online (Sandbox Code Playgroud)

另一尝试:

scala> df.select(cols.map(col): _*).show
<console>:34: error: type mismatch;
 found   : String => org.apache.spark.sql.Column
 required: org.apache.spark.sql.Column => ?
       df.select(cols.map(col): _*).show
Run Code Online (Sandbox Code Playgroud)

eli*_*sah 1

我会采用与@LeoC相同的方法,但我相信不需要将列名作为字符串进行操作,并且我会采用更像spark-sql的答案。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, first, last}

val win = Window.partitionBy("grp").orderBy(col("ord")).rowsBetween(0, Window.unboundedFollowing)

// In case there is more than one group column
val nonAggCols = Seq("grp")

// Select columns to aggregate on
val cols: Seq[String] = df.columns.diff(nonAggCols).toSeq

// Map over selection and apply fct
val aggregations: Seq[Column] = cols.map(c => first(col(c), ignoreNulls = true).as(c))

// I'd rather cache the following step as it might get expensive
val step1 = cols.foldLeft(df)((acc, c) => acc.withColumn(c, last(col(c), ignoreNulls = true).over(win))).cache

// Finally we can aggregate our results as followed
val results = step1.groupBy(nonAggCols.head, nonAggCols.tail: _*).agg(aggregations.head, aggregations.tail: _*)

results.show
// +---+--------+---+----+----+
// |grp|null_col|ord|col1|col2|
// +---+--------+---+----+----+
// |  1|    null| 12| s13|  11|
// |  2|    null| 19| a23|  77|
// +---+--------+---+----+----+
Run Code Online (Sandbox Code Playgroud)

我希望这有帮助。

编辑:您没有得到相同结果的原因是您使用的阅读器不正确。

它将null文件解释为字符串而不是null; IE :

scala> df.filter('col1.isNotNull).show
// +---+--------+---+----+----+
// |grp|null_col|ord|col1|col2|
// +---+--------+---+----+----+
// |  1|    null|  3|null|  11|
// |  2|    null|  2| xxx|  22|
// |  1|    null|  1| yyy|null|
// |  2|    null|  7|null|  33|
// |  1|    null| 12|null|null|
// |  2|    null| 19|null|  77|
// |  1|    null| 10| s13|null|
// |  2|    null| 11| a23|null|
// +---+--------+---+----+----+
Run Code Online (Sandbox Code Playgroud)

这是我的版本readSparkOutput

def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null" or col(c).isNotNull, col(c))))
}
Run Code Online (Sandbox Code Playgroud)