use*_*459 4 scala apache-spark apache-spark-sql
我使用 Spark 2.1。
如果我运行以下示例:
val seq = Seq((123,"2016-01-01","1"),(123,"2016-01-02","2"),(123,"2016-01-03","3"))
val df = seq.toDF("id","date","score")
val dfAgg = df.sort("id","date").groupBy("id").agg(last("score"))
dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
Run Code Online (Sandbox Code Playgroud)
上面代码的输出是:
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 1|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 2|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 1|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 3|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 3|
+---+------------------+
Run Code Online (Sandbox Code Playgroud)
目的是获取与每个 id 的最新日期相关的分数:
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 3|
+---+------------------+
Run Code Online (Sandbox Code Playgroud)
但这显然不起作用,因为结果是不确定的。我们是否必须使用窗口函数来实现这一点?
查看 org.apache.spark.sql.catalyst.expressions.aggregate.Last 的文档:
/**
* Returns the last value of `child` for a group of rows. If the last value of `child`
* is `null`, it returns `null` (respecting nulls). Even if [[Last]] is used on an already
* sorted column, if we do partial aggregation and final aggregation (when mergeExpression
* is used) its result will not be deterministic (unless the input table is sorted and has
* a single partition, and we use a single reducer to do the aggregation.).
*/
Run Code Online (Sandbox Code Playgroud)
表明不幸的是,这是预期的行为。
所以在回答我的问题时,现在看起来像 Window 函数,如SPARK DataFrame 所述:选择每组的第一行可能是最好的前进方式。
| 归档时间: |
|
| 查看次数: |
2677 次 |
| 最近记录: |