Bal*_*a13 1 apache-spark pyspark spark-dataframe
我正在尝试计算数据框中每一列的平均值,并从该列中的每个元素中减去。我创建了一个尝试这样做的函数,但是当我尝试使用 UDF 实现它时,出现错误:“float”对象没有属性“map”。关于如何创建这样一个功能的任何想法?谢谢!
def normalize(data):
average=data.map(lambda x: x[0]).sum()/data.count()
out=data.map(lambda x: (x-average))
return out
mapSTD=udf(normalize,IntegerType())
dats = data.withColumn('Normalized', mapSTD('Fare'))
Run Code Online (Sandbox Code Playgroud)
在您的示例中,UDF 函数存在问题,无法应用于行和整个 DataFrame。UDF 只能应用于单行,但 Spark 还支持在整个 DataFrame 上实现 UDAF(用户定义的聚合函数)。
要解决您的问题,您可以使用以下功能:
from pyspark.sql.functions import mean
def normalize(df, column):
average = df.agg(mean(df[column]).alias("mean")).collect()[0]["mean"]
return df.select(df[column] - average)
Run Code Online (Sandbox Code Playgroud)
像这样使用它:
normalize(df, "Fare")
Run Code Online (Sandbox Code Playgroud)
请注意,以上仅适用于单列,但可以实现更通用的内容:
def normalize(df, columns):
selectExpr = []
for column in columns:
average = df.agg(mean(df[column]).alias("mean")).collect()[0]["mean"]
selectExpr.append(df[column] - average)
return df.select(selectExpr)
Run Code Online (Sandbox Code Playgroud)
像这样使用它:
normalize(df, ["col1", "col2"])
Run Code Online (Sandbox Code Playgroud)
这是可行的,但您需要为每一列运行聚合,因此多列性能可能会出现问题,但可以只生成一个聚合表达式:
def normalize(df, columns):
aggExpr = []
for column in columns:
aggExpr.append(mean(df[column]).alias(column))
averages = df.agg(*aggExpr).collect()[0]
selectExpr = []
for column in columns:
selectExpr.append(df[column] - averages[column])
return df.select(selectExpr)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4642 次 |
| 最近记录: |