pyspark:聚合列中最频繁的值

Jes*_*ess 2 group-by aggregate pyspark

  aggregrated_table = df_input.groupBy('city', 'income_bracket') \
        .agg(
       count('suburb').alias('suburb'),
       sum('population').alias('population'),
       sum('gross_income').alias('gross_income'),
       sum('no_households').alias('no_households'))
Run Code Online (Sandbox Code Playgroud)

想按城市和收入等级分组,但在每个城市内,某些郊区有不同的收入等级。我如何按每个城市最常出现的收入等级分组?

例如:

  aggregrated_table = df_input.groupBy('city', 'income_bracket') \
        .agg(
       count('suburb').alias('suburb'),
       sum('population').alias('population'),
       sum('gross_income').alias('gross_income'),
       sum('no_households').alias('no_households'))
Run Code Online (Sandbox Code Playgroud)

将按income_bracket_10 分组

MaF*_*aFF 5

在聚合之前使用窗口函数可能会奏效:

from pyspark.sql import Window
import pyspark.sql.functions as psf

w = Window.partitionBy('city')
aggregrated_table = df_input.withColumn(
    "count", 
    psf.count("*").over(w)
).withColumn(
    "rn", 
    psf.row_number().over(w.orderBy(psf.desc("count")))
).filter("rn = 1").groupBy('city', 'income_bracket').agg(
   psf.count('suburb').alias('suburb'),
   psf.sum('population').alias('population'),
   psf.sum('gross_income').alias('gross_income'),
   psf.sum('no_households').alias('no_households'))
Run Code Online (Sandbox Code Playgroud)

您还可以在聚合后使用窗口函数,因为您要保留 (city,income_bracket) 出现次数的计数。


mfc*_*era 5

您不一定需要窗口函数:

aggregrated_table = (
    df_input.groupby("city", "suburb","income_bracket")
    .count()
    .withColumn("count_income", F.array("count", "income_bracket"))
    .groupby("city", "suburb")
    .agg(F.max("count_income").getItem(1).alias("most_common_income_bracket"))
) 
Run Code Online (Sandbox Code Playgroud)

我认为这符合您的要求。我真的不知道它是否比基于窗口的解决方案表现更好。