ID匹配时,在其他Pyspark Dataframe中逐列分割Pyspark Dataframe

Tre*_*ury 9 python pyspark spark-dataframe

我有一个PySpark DataFrame,df1,看起来像:

CustomerID  CustomerValue
12          .17
14          .15
14          .25
17          .50
17          .01
17          .35
Run Code Online (Sandbox Code Playgroud)

我有第二个PySpark DataFrame,df2,它是由CustomerID分组并由sum函数聚合的df1.它看起来像这样:

 CustomerID  CustomerValueSum
 12          .17
 14          .40
 17          .86
Run Code Online (Sandbox Code Playgroud)

我想为df1添加第三列,即df1 ['CustomerValue']除以df2 ['CustomerValueSum'],用于相同的CustomerID.这看起来像:

CustomerID  CustomerValue  NormalizedCustomerValue
12          .17            1.00
14          .15            .38
14          .25            .62
17          .50            .58
17          .01            .01
17          .35            .41
Run Code Online (Sandbox Code Playgroud)

换句话说,我正在尝试将此Python/Pandas代码转换为PySpark:

normalized_list = []
for idx, row in df1.iterrows():
    (
        normalized_list
        .append(
            row.CustomerValue / df2[df2.CustomerID == row.CustomerID].CustomerValueSum
        )
    )
df1['NormalizedCustomerValue'] = [val.values[0] for val in normalized_list]
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?

小智 11

码:

import pyspark.sql.functions as F

df1 = df1\
    .join(df2, "CustomerID")\
    .withColumn("NormalizedCustomerValue", (F.col("CustomerValue") / F.col("CustomerValueSum")))\
    .drop("CustomerValueSum")
Run Code Online (Sandbox Code Playgroud)

输出:

df1.show()

+----------+-------------+-----------------------+
|CustomerID|CustomerValue|NormalizedCustomerValue|
+----------+-------------+-----------------------+
|        17|          0.5|     0.5813953488372093|
|        17|         0.01|   0.011627906976744186|
|        17|         0.35|     0.4069767441860465|
|        12|         0.17|                    1.0|
|        14|         0.15|    0.37499999999999994|
|        14|         0.25|                  0.625|
+----------+-------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)


小智 5

这也可以使用 Spark Window 函数来实现,您无需使用聚合值 (df2) 创建单独的数据帧:

为输入数据框创建数据:

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

data =[(12, 0.17), (14, 0.15), (14, 0.25), (17, 0.5), (17, 0.01), (17, 0.35)]
df1 = sqlContext.createDataFrame(data, ['CustomerID', 'CustomerValue'])
df1.show()
+----------+-------------+
|CustomerID|CustomerValue|
+----------+-------------+
|        12|         0.17|
|        14|         0.15|
|        14|         0.25|
|        17|          0.5|
|        17|         0.01|
|        17|         0.35|
+----------+-------------+
Run Code Online (Sandbox Code Playgroud)

定义按 CustomerID 分区的 Window:

from pyspark.sql import Window
from pyspark.sql.functions import sum

w = Window.partitionBy('CustomerID')

df2 = df1.withColumn('NormalizedCustomerValue', df1.CustomerValue/sum(df1.CustomerValue).over(w)).orderBy('CustomerID')

df2.show()
+----------+-------------+-----------------------+
|CustomerID|CustomerValue|NormalizedCustomerValue|
+----------+-------------+-----------------------+
|        12|         0.17|                    1.0|
|        14|         0.15|    0.37499999999999994|
|        14|         0.25|                  0.625|
|        17|          0.5|     0.5813953488372093|
|        17|         0.01|   0.011627906976744186|
|        17|         0.35|     0.4069767441860465|
+----------+-------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)