Jac*_*iel 5 python apache-spark pyspark
我正在尝试使用python标准化SPARK DataFrame中的列。
我的数据集:
--------------------------
userID|Name|Revenue|No.of.Days|
--------------------------
1 A 12560 45
2 B 2312890 90
. . . .
. . . .
. . . .
--------------------------
Run Code Online (Sandbox Code Playgroud)
在此数据集中,除了userID和Name外,我必须对收入和天数进行标准化。
输出应如下所示
userID|Name|Revenue|No.of.Days|
--------------------------
1 A 0.5 0.5
2 B 0.9 1
. . 1 0.4
. . 0.6 .
. . . .
--------------------------
Run Code Online (Sandbox Code Playgroud)
用于计算或标准化各列中的值的公式为
val = (ei-min)/(max-min)
ei = column value at i th position
min = min value in that column
max = max value in that column
Run Code Online (Sandbox Code Playgroud)
如何使用PySpark轻松完成此操作?
Rin*_*haj 21
希望以下代码满足您的要求。
代码 :
df = spark.createDataFrame([ (1, 'A',12560,45),
(1, 'B',42560,90),
(1, 'C',31285,120),
(1, 'D',10345,150)
], ["userID", "Name","Revenue","No_of_Days"])
print("Before Scaling :")
df.show(5)
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())
# Iterating over columns to be scaled
for i in ["Revenue","No_of_Days"]:
# VectorAssembler Transformation - Converting column to vector type
assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
# MinMaxScaler Transformation
scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
# Pipeline of VectorAssembler and MinMaxScaler
pipeline = Pipeline(stages=[assembler, scaler])
# Fitting pipeline on dataframe
df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")
print("After Scaling :")
df.show(5)
Run Code Online (Sandbox Code Playgroud)
输出: