Scala Spark 使用 Window 函数来查找最大值

b-r*_*yce 2 scala apache-spark apache-spark-sql

我有一个看起来像这样的数据集:

+------------------------|-----+
|               timestamp| zone|
+------------------------+-----+
|    2019-01-01 00:05:00 |    A|
|    2019-01-01 00:05:00 |    A|
|    2019-01-01 00:05:00 |    B|
|    2019-01-01 01:05:00 |    C|
|    2019-01-01 02:05:00 |    B|
|    2019-01-01 02:05:00 |    B|
+------------------------+-----+
Run Code Online (Sandbox Code Playgroud)

对于每个小时,我需要计算哪个区域的行数最多,最后得到一个如下所示的表:

+-----|-----+-----+
| hour| zone| max |
+-----+-----+-----+
|    0|    A|    2|
|    1|    C|    1|
|    2|    B|    2|
+-----+-----+-----+
Run Code Online (Sandbox Code Playgroud)

我的说明说我需要使用 Window 函数和“group by”来找到我的最大计数。

我已经尝试了一些东西,但我不确定我是否接近。任何帮助,将不胜感激。

Rap*_*oth 6

您可以使用 2 个后续窗口函数来获得结果:

df
  .withColumn("hour",hour($"timestamp"))
  .withColumn("cnt",count("*").over(Window.partitionBy($"hour",$"zone")))
  .withColumn("rnb",row_number().over(Window.partitionBy($"hour").orderBy($"cnt".desc)))
  .where($"rnb"===1)
  .select($"hour",$"zone",$"cnt".as("max"))
Run Code Online (Sandbox Code Playgroud)


Che*_*ema 5

您可以使用Windowing functionsgroup by数据框。

在您的情况下,您可以使用rank() over(partition by)窗口函数。

import org.apache.spark.sql.function._

// first group by hour and zone
    val df_group = data_tms.
      select(hour(col("timestamp")).as("hour"), col("zone"))
      .groupBy(col("hour"), col("zone"))
      .agg(count("zone").as("max"))
// second rank by hour order by max in descending order
    val df_rank = df_group.
      select(col("hour"),
        col("zone"),
        col("max"),
        rank().over(Window.partitionBy(col("hour")).orderBy(col("max").desc)).as("rank"))
// filter by col rank = 1
    df_rank
      .select(col("hour"), 
        col("zone"), 
        col("max"))
      .where(col("rank") === 1)
      .orderBy(col("hour"))
     .show()

/*
+----+----+---+
|hour|zone|max|
+----+----+---+
|   0|   A|  2|
|   1|   C|  1|
|   2|   B|  2|
+----+----+---+
*/
Run Code Online (Sandbox Code Playgroud)