Tre*_*ury 9 python pyspark spark-dataframe
我有一个PySpark DataFrame,df1,看起来像:
CustomerID CustomerValue
12 .17
14 .15
14 .25
17 .50
17 .01
17 .35
Run Code Online (Sandbox Code Playgroud)
我有第二个PySpark DataFrame,df2,它是由CustomerID分组并由sum函数聚合的df1.它看起来像这样:
CustomerID CustomerValueSum
12 .17
14 .40
17 .86
Run Code Online (Sandbox Code Playgroud)
我想为df1添加第三列,即df1 ['CustomerValue']除以df2 ['CustomerValueSum'],用于相同的CustomerID.这看起来像:
CustomerID CustomerValue NormalizedCustomerValue
12 .17 1.00
14 .15 .38
14 .25 .62
17 .50 .58
17 .01 .01
17 .35 .41
Run Code Online (Sandbox Code Playgroud)
换句话说,我正在尝试将此Python/Pandas代码转换为PySpark:
normalized_list = []
for idx, row in df1.iterrows():
(
normalized_list
.append(
row.CustomerValue / df2[df2.CustomerID == row.CustomerID].CustomerValueSum
)
)
df1['NormalizedCustomerValue'] = [val.values[0] for val in normalized_list]
Run Code Online (Sandbox Code Playgroud)
我怎样才能做到这一点?
小智 11
码:
import pyspark.sql.functions as F
df1 = df1\
.join(df2, "CustomerID")\
.withColumn("NormalizedCustomerValue", (F.col("CustomerValue") / F.col("CustomerValueSum")))\
.drop("CustomerValueSum")
Run Code Online (Sandbox Code Playgroud)
输出:
df1.show()
+----------+-------------+-----------------------+
|CustomerID|CustomerValue|NormalizedCustomerValue|
+----------+-------------+-----------------------+
| 17| 0.5| 0.5813953488372093|
| 17| 0.01| 0.011627906976744186|
| 17| 0.35| 0.4069767441860465|
| 12| 0.17| 1.0|
| 14| 0.15| 0.37499999999999994|
| 14| 0.25| 0.625|
+----------+-------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
小智 5
这也可以使用 Spark Window 函数来实现,您无需使用聚合值 (df2) 创建单独的数据帧:
为输入数据框创建数据:
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
data =[(12, 0.17), (14, 0.15), (14, 0.25), (17, 0.5), (17, 0.01), (17, 0.35)]
df1 = sqlContext.createDataFrame(data, ['CustomerID', 'CustomerValue'])
df1.show()
+----------+-------------+
|CustomerID|CustomerValue|
+----------+-------------+
| 12| 0.17|
| 14| 0.15|
| 14| 0.25|
| 17| 0.5|
| 17| 0.01|
| 17| 0.35|
+----------+-------------+
Run Code Online (Sandbox Code Playgroud)
定义按 CustomerID 分区的 Window:
from pyspark.sql import Window
from pyspark.sql.functions import sum
w = Window.partitionBy('CustomerID')
df2 = df1.withColumn('NormalizedCustomerValue', df1.CustomerValue/sum(df1.CustomerValue).over(w)).orderBy('CustomerID')
df2.show()
+----------+-------------+-----------------------+
|CustomerID|CustomerValue|NormalizedCustomerValue|
+----------+-------------+-----------------------+
| 12| 0.17| 1.0|
| 14| 0.15| 0.37499999999999994|
| 14| 0.25| 0.625|
| 17| 0.5| 0.5813953488372093|
| 17| 0.01| 0.011627906976744186|
| 17| 0.35| 0.4069767441860465|
+----------+-------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
15979 次 |
| 最近记录: |