从pyspark数据框中减去平均值

Bal*_*a13 1 apache-spark pyspark spark-dataframe

我正在尝试计算数据框中每一列的平均值,并从该列中的每个元素中减去。我创建了一个尝试这样做的函数,但是当我尝试使用 UDF 实现它时,出现错误:“float”对象没有属性“map”。关于如何创建这样一个功能的任何想法?谢谢!

def normalize(data):
        average=data.map(lambda x: x[0]).sum()/data.count()
        out=data.map(lambda x: (x-average))
        return out

mapSTD=udf(normalize,IntegerType())     
dats = data.withColumn('Normalized', mapSTD('Fare'))
Run Code Online (Sandbox Code Playgroud)

Pio*_*ski 5

在您的示例中,UDF 函数存在问题,无法应用于行和整个 DataFrame。UDF 只能应用于单行,但 Spark 还支持在整个 DataFrame 上实现 UDAF(用户定义的聚合函数)。

要解决您的问题,您可以使用以下功能:

from pyspark.sql.functions import mean

def normalize(df, column):
    average = df.agg(mean(df[column]).alias("mean")).collect()[0]["mean"]
    return df.select(df[column] - average)
Run Code Online (Sandbox Code Playgroud)

像这样使用它:

normalize(df, "Fare")
Run Code Online (Sandbox Code Playgroud)

请注意,以上仅适用于单列,但可以实现更通用的内容:

def normalize(df, columns):
    selectExpr = []
    for column in columns:
       average = df.agg(mean(df[column]).alias("mean")).collect()[0]["mean"]
       selectExpr.append(df[column] - average)
    return df.select(selectExpr)
Run Code Online (Sandbox Code Playgroud)

像这样使用它:

normalize(df, ["col1", "col2"])
Run Code Online (Sandbox Code Playgroud)

这是可行的,但您需要为每一列运行聚合,因此多列性能可能会出现问题,但可以只生成一个聚合表达式:

def normalize(df, columns):
    aggExpr = []
    for column in columns:
        aggExpr.append(mean(df[column]).alias(column))
    averages = df.agg(*aggExpr).collect()[0]
    selectExpr = []
    for column in columns:
        selectExpr.append(df[column] - averages[column])
    return df.select(selectExpr)
Run Code Online (Sandbox Code Playgroud)