在 PySpark 中,存在这样的概念coalesce(colA, colB, ...):每行将从这些列中获取它遇到的第一个非空值。但是,我希望coalesce(rowA, rowB, ...)能够在每列中获取它从这些行中遇到的第一个非空值。我想合并一组或行窗口中的所有行。
例如,给定以下数据集,我想合并每个类别的行并按日期升序排列。
+---------+-----------+------+------+
| category| date| val1| val2|
+---------+-----------+------+------+
| A| 2020-05-01| null| 1|
| A| 2020-05-02| 2| null|
| A| 2020-05-03| 3| null|
| B| 2020-05-01| null| null|
| B| 2020-05-02| 4| null|
| C| 2020-05-01| 5| 2|
| C| 2020-05-02| null| 3|
| D| 2020-05-01| null| 4|
+---------+-----------+------+------+
Run Code Online (Sandbox Code Playgroud)
我应该得到的输出是......
+---------+-----------+------+------+
| category| date| val1| val2|
+---------+-----------+------+------+
| A| 2020-05-01| 2| 1|
| B| 2020-05-01| 4| null|
| C| 2020-05-01| 5| 2|
| D| 2020-05-01| null| 4|
+---------+-----------+------+------+
Run Code Online (Sandbox Code Playgroud)
首先我来给出答案。然后,我会指出重要的部分。
from pyspark.sql import Window
from pyspark.sql.functions import col, dense_rank, first
df = ... # dataframe from question description
window = (
Window
.partitionBy("category")
.orderBy(col("date").asc())
)
window_unbounded = (
window
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)
cols_to_merge = [col for col in df.columns if col not in ["category", "date"]]
merged_cols = [first(col, True).over(window_unbounded).alias(col) for col in cols_to_merge]
df_merged = (
df
.select([col("category"), col("date")] + merged_cols)
.withColumn("rank_col", dense_rank().over(window))
.filter(col("rank_col") == 1)
.drop("rank_col")
)
Run Code Online (Sandbox Code Playgroud)
与coalesce类似的按行是聚合函数first。具体来说,我们使用first withignorenulls = True来找到第一个非空值。
当我们使用first时,我们必须小心它所应用到的行的顺序。因为groupBy不允许我们维护组内的顺序,所以我们使用Window。
窗口本身必须在两端都是无界的,而不是当前 row 之前的默认无界,否则我们最终first可能会在组的子集上运行聚合。
在窗口上聚合后,我们将列别名回其原始名称,以保持列名称一致。
我们使用cols的单个select语句而不是 for 循环 with,因为 select 语句大大减少了查询计划深度。如果您使用循环的withColumn,如果列太多,您可能会遇到堆栈溢出错误。df.withColumn(col, ...)
最后,我们对dense_rank窗口运行一次——这次使用具有默认范围的窗口——并仅过滤排名第一的行。我们在这里使用密集排名,但我们可以使用任何排名函数,只要适合我们的需要即可。
| 归档时间: |
|
| 查看次数: |
5173 次 |
| 最近记录: |