pyspark approxQuantile功能

BK *_* C. 8 apache-spark apache-spark-sql pyspark pyspark-sql

我有这些列数据框id,price,timestamp.

我想找到按中值分组的中位数值id.

我正在使用此代码来查找它,但它给了我这个错误.

from pyspark.sql import DataFrameStatFunctions as statFunc
windowSpec = Window.partitionBy("id")
median = statFunc.approxQuantile("price",
                                 [0.5],
                                 0) \
                 .over(windowSpec)

return df.withColumn("Median", median)
Run Code Online (Sandbox Code Playgroud)

是否无法DataFrameStatFunctions用于填充新列中的值?

TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
Run Code Online (Sandbox Code Playgroud)

des*_*aut 39

嗯,的确是能够使用approxQuantile一个新的数据框栏填入值,但这不是为什么你收到此错误.不幸的是,整个底层的故事是一个相当令人沮丧的故事,因为我认为许多Spark(特别是PySpark)功能和缺乏足够的文档就是这种情况.

首先,没有一种方法,而是两种 approxQuantile方法; 在第一个是标准的数据框类,即你不需要导入DataFrameStatFunctions的一部分:

spark.version
# u'2.1.1'

sampleData = [("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)]

df = spark.createDataFrame(sampleData, schema=["Name","Role","Salary"])
df.show()
# +------+---------+------+ 
# |  Name|     Role|Salary|
# +------+---------+------+
# |   bob|Developer|125000| 
# |  mark|Developer|108000|
# |  carl|   Tester| 70000|
# | peter|Developer|185000|
# |   jon|   Tester| 65000|
# | roman|   Tester| 82000|
# | simon|Developer| 98000|
# |  eric|Developer|144000|
# |carlos|   Tester| 75000|
# | henry|Developer|110000|
# +------+---------+------+

med = df.approxQuantile("Salary", [0.5], 0.25) # no need to import DataFrameStatFunctions
med
# [98000.0]
Run Code Online (Sandbox Code Playgroud)

第二个其中的一部分DataFrameStatFunctions,但如果您像使用它一样,则会收到您报告的错误:

from pyspark.sql import DataFrameStatFunctions as statFunc
med2 = statFunc.approxQuantile( "Salary", [0.5], 0.25)
# TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
Run Code Online (Sandbox Code Playgroud)

因为正确的用法是

med2 = statFunc(df).approxQuantile( "Salary", [0.5], 0.25)
med2
# [82000.0]
Run Code Online (Sandbox Code Playgroud)

虽然你不能在PySpark文档中找到一个关于这个的简单例子(我花了一些时间自己弄清楚)...最好的部分?这两个值不相等:

med == med2
# False
Run Code Online (Sandbox Code Playgroud)

我怀疑这是由于使用了非确定性算法(毕竟,它应该是一个近似的中位数),即使你用相同的玩具数据重新运行命令,你可能得到不同的值(并且不同于我在这里报告的那些) - 我建议稍微试验一下以获得感觉......

但是,正如我已经说过的,这不是你不能approxQuantile用来填充新数据帧列中的值的原因- 即使你使用正确的语法,你也会得到一个不同的错误:

df2 = df.withColumn('median_salary', statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
# AssertionError: col should be Column
Run Code Online (Sandbox Code Playgroud)

这里,col指的是withColumn操作的第二个参数,即approxQuantile一个,并且错误消息表明它不是一个Column类型 - 实际上,它是一个列表:

type(statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
# list
Run Code Online (Sandbox Code Playgroud)

因此,在填充列值时,Spark需要类型的参数Column,并且不能使用列表; 这是一个创建一个新列的示例,其中每个角色的平均值代替中间值:

import pyspark.sql.functions as func
from pyspark.sql import Window

windowSpec = Window.partitionBy(df['Role'])
df2 = df.withColumn('mean_salary', func.mean(df['Salary']).over(windowSpec))
df2.show()
# +------+---------+------+------------------+
# |  Name|     Role|Salary|       mean_salary| 
# +------+---------+------+------------------+
# |  carl|   Tester| 70000|           73000.0| 
# |   jon|   Tester| 65000|           73000.0|
# | roman|   Tester| 82000|           73000.0|
# |carlos|   Tester| 75000|           73000.0|
# |   bob|Developer|125000|128333.33333333333|
# |  mark|Developer|108000|128333.33333333333| 
# | peter|Developer|185000|128333.33333333333| 
# | simon|Developer| 98000|128333.33333333333| 
# |  eric|Developer|144000|128333.33333333333|
# | henry|Developer|110000|128333.33333333333| 
# +------+---------+------+------------------+
Run Code Online (Sandbox Code Playgroud)

这是有效的,因为,相反approxQuantile,mean返回一个Column:

type(func.mean(df['Salary']).over(windowSpec))
# pyspark.sql.column.Column
Run Code Online (Sandbox Code Playgroud)


vol*_*lhv 7

计算组中的分位数(聚合)示例

由于组缺少聚合函数,我添加了一个按名称构造函数调用的示例(percentile_approx对于这种情况):

from pyspark.sql.column import Column, _to_java_column, _to_seq

def from_name(sc, func_name, *params):
    """
       create call by function name 
    """
    callUDF = sc._jvm.org.apache.spark.sql.functions.callUDF
    func = callUDF(func_name, _to_seq(sc, *params, _to_java_column))
    return Column(func)
Run Code Online (Sandbox Code Playgroud)

percentile_approx在 groupBy 中应用函数:

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# build percentile_approx function call by name: 
target = from_name(sc, "percentile_approx", [f.col("salary"), f.lit(0.95)])


# load dataframe for persons data 
# with columns "person_id", "group_id" and "salary"
persons = spark.read.parquet( ... )

# apply function for each group
persons.groupBy("group_id").agg(
    target.alias("target")).show()
Run Code Online (Sandbox Code Playgroud)