MAR*_*ARK 5 python apache-spark pyspark
我正在尝试计算 pyspark 中的加权平均值,但没有取得很大进展
# Example data
df = sc.parallelize([
("a", 7, 1), ("a", 5, 2), ("a", 4, 3),
("b", 2, 2), ("b", 5, 4), ("c", 1, -1)
]).toDF(["k", "v1", "v2"])
df.show()
import numpy as np
def weighted_mean(workclass, final_weight):
return np.average(workclass, weights=final_weight)
weighted_mean_udaf = pyspark.sql.functions.udf(weighted_mean,
pyspark.sql.types.IntegerType())
Run Code Online (Sandbox Code Playgroud)
但是当我尝试执行这段代码时
df.groupby('k').agg(weighted_mean_udaf(df.v1,df.v2)).show()
Run Code Online (Sandbox Code Playgroud)
我收到错误
u"expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get
Run Code Online (Sandbox Code Playgroud)
我的问题是,我可以指定一个自定义函数(采用多个参数)作为 agg 的参数吗?如果没有,是否有其他方法可以在按键分组后执行加权平均值等操作?
用户定义的聚合函数(UDAF,可在 pyspark 上运行pyspark.sql.GroupedData,但在 pyspark 中不受支持)与用户定义函数(UDF,可在 pyspark 上运行)不同pyspark.sql.DataFrame。
因为在 pyspark 中你无法创建自己的 UDAF,并且提供的 UDAF 无法解决你的问题,所以你可能需要回到 RDD 世界:
from numpy import sum
def weighted_mean(vals):
vals = list(vals) # save the values from the iterator
sum_of_weights = sum(tup[1] for tup in vals)
return sum(1. * tup[0] * tup[1] / sum_of_weights for tup in vals)
df.rdd.map(
lambda x: (x[0], tuple(x[1:])) # reshape to (key, val) so grouping could work
).groupByKey().mapValues(
weighted_mean
).collect()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3822 次 |
| 最近记录: |