GroupBy 和聚合未保留 Spark SQL 排序顺序?

use*_*459 4 scala apache-spark apache-spark-sql

我使用 Spark 2.1。

如果我运行以下示例:

val seq = Seq((123,"2016-01-01","1"),(123,"2016-01-02","2"),(123,"2016-01-03","3"))

val df = seq.toDF("id","date","score")

val dfAgg = df.sort("id","date").groupBy("id").agg(last("score"))

dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
Run Code Online (Sandbox Code Playgroud)

上面代码的输出是:

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 1|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 2|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 1|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 3|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 3|
+---+------------------+
Run Code Online (Sandbox Code Playgroud)

目的是获取与每个 id 的最新日期相关的分数:

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 3|
+---+------------------+ 
Run Code Online (Sandbox Code Playgroud)

但这显然不起作用,因为结果是不确定的。我们是否必须使用窗口函数来实现这一点?

use*_*459 5

查看 org.apache.spark.sql.catalyst.expressions.aggregate.Last 的文档:

/**
 * Returns the last value of `child` for a group of rows. If the last value of `child`
 * is `null`, it returns `null` (respecting nulls). Even if [[Last]] is used on an already
 * sorted column, if we do partial aggregation and final aggregation (when mergeExpression
 * is used) its result will not be deterministic (unless the input table is sorted and has
 * a single partition, and we use a single reducer to do the aggregation.).
 */
Run Code Online (Sandbox Code Playgroud)

表明不幸的是,这是预期的行为。

所以在回答我的问题时,现在看起来像 Window 函数,如SPARK DataFrame 所述:选择每组的第一行可能是最好的前进方式。