在SPARK Dataframe中缩放(标准化)列-Pyspark

Jac*_*iel 5 python apache-spark pyspark

我正在尝试使用python标准化SPARK DataFrame中的列。

我的数据集:

--------------------------
userID|Name|Revenue|No.of.Days|
--------------------------
1      A     12560    45
2      B     2312890  90
.      .       .       .
.      .       .       .
.      .       .       .
--------------------------
Run Code Online (Sandbox Code Playgroud)

在此数据集中,除了userID和Name外,我必须对收入和天数进行标准化。

输出应如下所示


userID|Name|Revenue|No.of.Days|
--------------------------
1      A     0.5     0.5
2      B     0.9       1
.      .       1     0.4
.      .     0.6       .
.      .       .       .
--------------------------
Run Code Online (Sandbox Code Playgroud)

用于计算或标准化各列中的值的公式为

val = (ei-min)/(max-min)
ei = column value at i th position
min = min value in that column
max = max value in that column
Run Code Online (Sandbox Code Playgroud)

如何使用PySpark轻松完成此操作?

Rin*_*haj 21

希望以下代码满足您的要求。

代码 :

df = spark.createDataFrame([ (1, 'A',12560,45),
                             (1, 'B',42560,90),
                             (1, 'C',31285,120),
                             (1, 'D',10345,150)
                           ], ["userID", "Name","Revenue","No_of_Days"])

print("Before Scaling :")
df.show(5)

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Iterating over columns to be scaled
for i in ["Revenue","No_of_Days"]:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

print("After Scaling :")
df.show(5)
Run Code Online (Sandbox Code Playgroud)

输出:

输出

  • 很棒的答案。但是,对于在缩放后使用“KMeans()”的任何人来说,出于某种奇怪的原因,如果我没有将数据类型保留为“向量”,则会出现错误。使用“StandardScaler()”+“VectorAssembler( )` + `KMeans()` 需要向量类型。即使使用 VectorAssembler 将其转换为向量;如果我执行 float -> 矢量而不是矢量 -> 矢量,我会不断收到提示,表示我的特征向量中有 na/null 值。 (3认同)
  • 规范化“DataFrame”的所有列(也许是数千列)怎么样?似乎迭代要缩放的列仍然很慢。 (2认同)