b-r*_*yce 2 scala apache-spark apache-spark-sql
我有一个看起来像这样的数据集:
+------------------------|-----+
| timestamp| zone|
+------------------------+-----+
| 2019-01-01 00:05:00 | A|
| 2019-01-01 00:05:00 | A|
| 2019-01-01 00:05:00 | B|
| 2019-01-01 01:05:00 | C|
| 2019-01-01 02:05:00 | B|
| 2019-01-01 02:05:00 | B|
+------------------------+-----+
Run Code Online (Sandbox Code Playgroud)
对于每个小时,我需要计算哪个区域的行数最多,最后得到一个如下所示的表:
+-----|-----+-----+
| hour| zone| max |
+-----+-----+-----+
| 0| A| 2|
| 1| C| 1|
| 2| B| 2|
+-----+-----+-----+
Run Code Online (Sandbox Code Playgroud)
我的说明说我需要使用 Window 函数和“group by”来找到我的最大计数。
我已经尝试了一些东西,但我不确定我是否接近。任何帮助,将不胜感激。
您可以使用 2 个后续窗口函数来获得结果:
df
.withColumn("hour",hour($"timestamp"))
.withColumn("cnt",count("*").over(Window.partitionBy($"hour",$"zone")))
.withColumn("rnb",row_number().over(Window.partitionBy($"hour").orderBy($"cnt".desc)))
.where($"rnb"===1)
.select($"hour",$"zone",$"cnt".as("max"))
Run Code Online (Sandbox Code Playgroud)
您可以使用Windowing functions和group by数据框。
在您的情况下,您可以使用rank() over(partition by)窗口函数。
import org.apache.spark.sql.function._
// first group by hour and zone
val df_group = data_tms.
select(hour(col("timestamp")).as("hour"), col("zone"))
.groupBy(col("hour"), col("zone"))
.agg(count("zone").as("max"))
// second rank by hour order by max in descending order
val df_rank = df_group.
select(col("hour"),
col("zone"),
col("max"),
rank().over(Window.partitionBy(col("hour")).orderBy(col("max").desc)).as("rank"))
// filter by col rank = 1
df_rank
.select(col("hour"),
col("zone"),
col("max"))
.where(col("rank") === 1)
.orderBy(col("hour"))
.show()
/*
+----+----+---+
|hour|zone|max|
+----+----+---+
| 0| A| 2|
| 1| C| 1|
| 2| B| 2|
+----+----+---+
*/
Run Code Online (Sandbox Code Playgroud)