使用别名透视和聚合 PySpark 数据帧

Ama*_*nda 0 alias pivot aggregate-functions apache-spark-sql pyspark

我有一个与此类似的 PySpark DataFrame:

df = sc.parallelize([
    ("c1", "A", 3.4, 0.4, 3.5), 
    ("c1", "B", 9.6, 0.0, 0.0),
    ("c1", "A", 2.8, 0.4, 0.3),
    ("c1", "B", 5.4, 0.2, 0.11),
    ("c2", "A", 0.0, 9.7, 0.3), 
    ("c2", "B", 9.6, 8.6, 0.1),
    ("c2", "A", 7.3, 9.1, 7.0),
    ("c2", "B", 0.7, 6.4, 4.3)
]).toDF(["user_id", "type", "d1", 'd2', 'd3'])
df.show()
Run Code Online (Sandbox Code Playgroud)

这使:

+-------+----+---+---+----+
|user_id|type| d1| d2|  d3|
+-------+----+---+---+----+
|     c1|   A|3.4|0.4| 3.5|
|     c1|   B|9.6|0.0| 0.0|
|     c1|   A|2.8|0.4| 0.3|
|     c1|   B|5.4|0.2|0.11|
|     c2|   A|0.0|9.7| 0.3|
|     c2|   B|9.6|8.6| 0.1|
|     c2|   A|7.3|9.1| 7.0|
|     c2|   B|0.7|6.4| 4.3|
+-------+----+---+---+----+
Run Code Online (Sandbox Code Playgroud)

我已经通过type列聚合结果来旋转它sum()

data_wide = df.groupBy('user_id')\
.pivot('type').sum()
data_wide.show()
Run Code Online (Sandbox Code Playgroud)

这使:

 +-------+-----------------+------------------+-----------+------------------+-----------+------------------+
|user_id|      A_sum(`d1`)|       A_sum(`d2`)|A_sum(`d3`)|       B_sum(`d1`)|B_sum(`d2`)|       B_sum(`d3`)|
+-------+-----------------+------------------+-----------+------------------+-----------+------------------+
|     c1|6.199999999999999|               0.8|        3.8|              15.0|        0.2|              0.11|
|     c2|              7.3|18.799999999999997|        7.3|10.299999999999999|       15.0|4.3999999999999995|
+-------+-----------------+------------------+-----------+------------------+-----------+------------------+
Run Code Online (Sandbox Code Playgroud)

现在,生成的列名包含`(波浪号)字符,这是一个问题,例如,在 Vector Assembler 中引入这个新列,因为它返回一个syntax error in attribute name. 出于这个原因,我需要重命名列名,但withColumnRenamed在循环内或reduce(lambda...)函数内调用方法需要很多时间(实际上我的 df 有 11.520 列)。

有什么方法可以避免在 pivot+aggregation 步骤中使用此字符或递归分配取决于新透视列名称的别名?

先感谢您

pau*_*ult 7

可以在聚合内为pivotusing 进行重命名alias

import pyspark.sql.functions as f
data_wide = df.groupBy('user_id')\
    .pivot('type')\
    .agg(*[f.sum(x).alias(x) for x in df.columns if x not in {"user_id", "type"}])
data_wide.show()
#+-------+-----------------+------------------+----+------------------+----+------------------+
#|user_id|             A_d1|              A_d2|A_d3|              B_d1|B_d2|              B_d3|
#+-------+-----------------+------------------+----+------------------+----+------------------+
#|     c1|6.199999999999999|               0.8| 3.8|              15.0| 0.2|              0.11|
#|     c2|              7.3|18.799999999999997| 7.3|10.299999999999999|15.0|4.3999999999999995|
#+-------+-----------------+------------------+----+------------------+----+------------------+
Run Code Online (Sandbox Code Playgroud)

但是,这pivot与之后进行并重命名确实没有什么不同。下面是这个方法的执行计划:

#== Physical Plan ==
#HashAggregate(keys=[user_id#0], functions=[pivotfirst(type#1, sum(`d1`) AS `d1`#169, A, B, 0, 0), pivotfirst(type#1, sum(`d2`) 
#AS `d2`#170, A, B, 0, 0), pivotfirst(type#1, sum(`d3`) AS `d3`#171, A, B, 0, 0)])
#+- Exchange hashpartitioning(user_id#0, 200)
#   +- HashAggregate(keys=[user_id#0], functions=[partial_pivotfirst(type#1, sum(`d1`) AS `d1`#169, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d2`) AS `d2`#170, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d3`) AS `d3`#171, A, B, 0, 0)])
#      +- *HashAggregate(keys=[user_id#0, type#1], functions=[sum(d1#2), sum(d2#3), sum(d3#4)])
#         +- Exchange hashpartitioning(user_id#0, type#1, 200)
#            +- *HashAggregate(keys=[user_id#0, type#1], functions=[partial_sum(d1#2), partial_sum(d2#3), partial_sum(d3#4)])
#               +- Scan ExistingRDD[user_id#0,type#1,d1#2,d2#3,d3#4]
Run Code Online (Sandbox Code Playgroud)

将此与此答案中的方法进行比较:

import re

def clean_names(df):
    p = re.compile("^(\w+?)_([a-z]+)\((\w+)\)(?:\(\))?")
    return df.toDF(*[p.sub(r"\1_\3", c) for c in df.columns])

pivoted = df.groupBy('user_id').pivot('type').sum()
clean_names(pivoted).explain()
#== Physical Plan ==
#HashAggregate(keys=[user_id#0], functions=[pivotfirst(type#1, sum(`d1`)#363, A, B, 0, 0), pivotfirst(type#1, sum(`d2`)#364, A, B, 0, 0), pivotfirst(type#1, sum(`d3`)#365, A, B, 0, 0)])
#+- Exchange hashpartitioning(user_id#0, 200)
#   +- HashAggregate(keys=[user_id#0], functions=[partial_pivotfirst(type#1, sum(`d1`)#363, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d2`)#364, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d3`)#365, A, B, 0, 0)])
#      +- *HashAggregate(keys=[user_id#0, type#1], functions=[sum(d1#2), sum(d2#3), sum(d3#4)])
#         +- Exchange hashpartitioning(user_id#0, type#1, 200)
#            +- *HashAggregate(keys=[user_id#0, type#1], functions=[partial_sum(d1#2), partial_sum(d2#3), partial_sum(d3#4)])
#               +- Scan ExistingRDD[user_id#0,type#1,d1#2,d2#3,d3#4]
Run Code Online (Sandbox Code Playgroud)

你会发现两者实际上是相同的。通过避免使用正则表达式,您可能会获得一些微小的加速,但与pivot.