PySpark group中的中位数/分位数

abe*_*bop 28 apache-spark apache-spark-sql pyspark pyspark-sql

我想在Spark数据帧上计算组分位数(使用PySpark).近似或精确的结果都可以.我更喜欢在groupBy/ 的上下文中使用的解决方案agg,以便我可以将它与其他PySpark聚合函数混合使用.如果由于某种原因这是不可能的,那么不同的方法也可以.

这个问题是相关的,但没有说明如何approxQuantile用作聚合函数.

我也可以访问percentile_approxHive UDF,但我不知道如何将它用作聚合函数.

为了特异性,假设我有以下数据帧:

from pyspark import SparkContext
import pyspark.sql.functions as f

sc = SparkContext()    

df = sc.parallelize([
    ['A', 1],
    ['A', 2],
    ['A', 3],
    ['B', 4],
    ['B', 5],
    ['B', 6],
]).toDF(('grp', 'val'))

df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()
Run Code Online (Sandbox Code Playgroud)

预期结果是:

+----+-------+
| grp|med_val|
+----+-------+
|   A|      2|
|   B|      5|
+----+-------+
Run Code Online (Sandbox Code Playgroud)

kae*_*ael 35

我想你不再需要它了.但是会留给后代(即下周我忘记的时候).

from pyspark.sql import Window
import pyspark.sql.functions as F

grp_window = Window.partitionBy('grp')
magic_percentile = F.expr('percentile_approx(val, 0.5)')

df.withColumn('med_val', magic_percentile.over(grp_window))
Run Code Online (Sandbox Code Playgroud)

或者为了准确地解决您的问题,这也有效:

df.groupBy('gpr').agg(magic_percentile.alias('med_val'))
Run Code Online (Sandbox Code Playgroud)

作为奖励,您可以传递一系列百分位数:

quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')
Run Code Online (Sandbox Code Playgroud)

你会得到一个清单作为回报.

  • 在 Spark 3.1.0 中,现在可以直接在 PySpark groupby 聚合中使用 `percentile_approx`: `df.groupBy("key").agg(percentile_approx("value", 0.5, lit(1000000)).alias("median “))` https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.percentile_approx.html (6认同)
  • @thentangler:前者是精确的百分位,对于大型数据集来说不是可扩展的操作,后者是近似的但可扩展的。 (2认同)

Sha*_*ica 10

由于您可以访问percentile_approx,一个简单的解决方案是在SQL命令中使用它:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df.registerTempTable("df")
df2 = sqlContext.sql("select grp, percentile_approx(val, 0.5) as med_val from df group by grp")
Run Code Online (Sandbox Code Playgroud)

  • 我在问题中澄清了我理想的解决方案.很明显,这个答案可以胜任,但这并不是我想要的.我会暂时搁置这个问题,看看是否有更清晰的答案. (3认同)

Jan*_*azz 9

似乎可以通过pyspark >= 3.1.0使用完全解决percentile_approx

import pyspark.sql.functions as func    

df.groupBy("grp").agg(func.percentile_approx("val", 0.5).alias("median"))
Run Code Online (Sandbox Code Playgroud)

有关更多信息,请参阅: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.percentile_approx.html


des*_*aut 8

不幸的是,据我所知,似乎不可能用"纯粹的"PySpark命令(Shaido的解决方案提供SQL解决方案)这样做,原因很简单:与其他聚合形成对比函数,例如mean,approxQuantile不返回Column类型,而是返回列表.

让我们看一下示例数据的快速示例:

spark.version
# u'2.2.0'

import pyspark.sql.functions as func
from pyspark.sql import DataFrameStatFunctions as statFunc

# aggregate with mean works OK:
df_grp_mean = df.groupBy('grp').agg(func.mean(df['val']).alias('mean_val'))
df_grp_mean.show()
# +---+--------+ 
# |grp|mean_val|
# +---+--------+
# |  B|     5.0|
# |  A|     2.0|
# +---+--------+

# try aggregating by median:
df_grp_med = df.groupBy('grp').agg(statFunc(df).approxQuantile('val', [0.5], 0.1))
# AssertionError: all exprs should be Column

# mean aggregation is a Column, but median is a list:

type(func.mean(df['val']))
# pyspark.sql.column.Column

type(statFunc(df).approxQuantile('val', [0.5], 0.1))
# list
Run Code Online (Sandbox Code Playgroud)

我怀疑基于窗口的方法会有什么不同,因为我说的根本原因是一个非常基本的原因.

有关更多详细信息,请参阅我的答案.


Kix*_*oms 5

最简单的方法pyspark==2.4.5是:

df \
    .groupby('grp') \
    .agg(expr('percentile(val, array(0.5))')[0].alias('50%')) \
    .show()

Run Code Online (Sandbox Code Playgroud)

输出:

|grp|50%|
+---+---+
|  B|5.0|
|  A|2.0|
+---+---+
Run Code Online (Sandbox Code Playgroud)