GroupBy在Pyspark中具有最大值的列和过滤器行

Tho*_*mas 19 python apache-spark apache-spark-sql pyspark

我几乎肯定以前曾经问过这个问题,但是通过stackoverflow搜索并没有回答我的问题.不是[2]的重复,因为我想要最大值,而不是最频繁的项目.我是pyspark的新手,并且尝试做一些非常简单的事情:我想将groupBy列"A"分组,然后只保留每个组中具有"B"列中最大值的行.像这样:

df_cleaned = df.groupBy("A").agg(F.max("B"))
Run Code Online (Sandbox Code Playgroud)

不幸的是,这会抛弃所有其他列 - df_cleaned只包含列"A"和B的最大值.我如何保留行?("A","B","C"......)

pau*_*ult 26

您可以在不udf使用a的情况下执行此操作Window.

请考虑以下示例:

import pyspark.sql.functions as f
data = [
    ('a', 5),
    ('a', 8),
    ('a', 7),
    ('b', 1),
    ('b', 3)
]
df = sqlCtx.createDataFrame(data, ["A", "B"])
df.show()
#+---+---+
#|  A|  B|
#+---+---+
#|  a|  5|
#|  a|  8|
#|  a|  7|
#|  b|  1|
#|  b|  3|
#+---+---+
Run Code Online (Sandbox Code Playgroud)

Window按列创建分区,A并使用此分区计算每个组的最大值.然后过滤掉行,使列中的值B等于max.

from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
    .where(f.col('B') == f.col('maxB'))\
    .drop('maxB')\
    .show()
#+---+---+
#|  A|  B|
#+---+---+
#|  a|  8|
#|  b|  3|
#+---+---+
Run Code Online (Sandbox Code Playgroud)

或等效使用pyspark-sql:

df.registerTempTable('table')
q = "SELECT A, B FROM (SELECT *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERE B = maxB"
sqlCtx.sql(q).show()
#+---+---+
#|  A|  B|
#+---+---+
#|  b|  3|
#|  a|  8|
#+---+---+
Run Code Online (Sandbox Code Playgroud)


Fer*_*ann 7

有两个很好的解决方案,所以我决定对它们进行基准测试。首先让我定义一个更大的数据框:

\n
N_SAMPLES = 600000\nN_PARTITIONS = 1000\nMAX_VALUE = 100\ndata = zip([random.randint(0, N_PARTITIONS-1) for i in range(N_SAMPLES)],\n          [random.randint(0, MAX_VALUE) for i in range(N_SAMPLES)],\n          list(range(N_SAMPLES))\n          )\ndf = spark.createDataFrame(data, ["A", "B", "C"])\ndf.show()\n+---+---+---+\n|  A|  B|  C|\n+---+---+---+\n|118| 91|  0|\n|439| 80|  1|\n|779| 77|  2|\n|444| 14|  3|\n...\n
Run Code Online (Sandbox Code Playgroud)\n

对 @pault 的解决方案进行基准测试:

\n
%%timeit\nw = Window.partitionBy(\'A\')\ndf_collect = df.withColumn(\'maxB\', f.max(\'B\').over(w))\\\n    .where(f.col(\'B\') == f.col(\'maxB\'))\\\n    .drop(\'maxB\')\\\n    .collect()\n
Run Code Online (Sandbox Code Playgroud)\n

给出

\n
655 ms \xc2\xb1 70.9 ms per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n
Run Code Online (Sandbox Code Playgroud)\n

对 @ndricca\ 的解决方案进行基准测试:

\n
655 ms \xc2\xb1 70.9 ms per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n
Run Code Online (Sandbox Code Playgroud)\n

给出

\n
1 s \xc2\xb1 49.1 ms per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n
Run Code Online (Sandbox Code Playgroud)\n

所以,@pault 的解决方案似乎快了 1.5 倍。非常欢迎对此基准的反馈。

\n


ndr*_*cca 6

另一种可能的方法是应用指定“ leftsemi”本身的联接数据框。这种连接在左侧包括数据框中的所有列,而在右侧不包括任何列。

例如:

import pyspark.sql.functions as f
data = [
    ('a', 5, 'c'),
    ('a', 8, 'd'),
    ('a', 7, 'e'),
    ('b', 1, 'f'),
    ('b', 3, 'g')
]
df = sqlContext.createDataFrame(data, ["A", "B", "C"])
df.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  5|  c|
|  a|  8|  d|
|  a|  7|  e|
|  b|  1|  f|
|  b|  3|  g|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)

可以按以下方式选择B列按A列的最大值:

df.groupBy('A').agg(f.max('B')
+---+---+
|  A|  B|
+---+---+
|  a|  8|
|  b|  3|
+---+---+
Run Code Online (Sandbox Code Playgroud)

使用此表达式作为左半联接的右侧,并将获得的列重命名max(B)为其原始名称B,我们可以获得所需的结果:

df.join(df.groupBy('A').agg(f.max('B').alias('B')),on='B',how='leftsemi').show()
+---+---+---+
|  B|  A|  C|
+---+---+---+
|  3|  b|  g|
|  8|  a|  d|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)

该解决方案背后的物理计划与公认的答案中的一个是不同的,我仍然不清楚哪一个在大型数据帧上会表现更好。

使用spark SQL语法可以得到相同的结果:

df.registerTempTable('table')
q = '''SELECT *
FROM table a LEFT SEMI
JOIN (
    SELECT 
        A,
        max(B) as max_B
    FROM table
    GROUP BY A
    ) t
ON a.A=t.A AND a.B=t.max_B
'''
sqlContext.sql(q).show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  b|  3|  g|
|  a|  8|  d|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)