如何合并 pyspark 中的行?

Dav*_*hao 3 pyspark

在 PySpark 中,存在这样的概念coalesce(colA, colB, ...):每行将从这些列中获取它遇到的第一个非空值。但是,我希望coalesce(rowA, rowB, ...)能够在每列中获取它从这些行中遇到的第一个非空值。我想合并一组或行窗口中的所有行。

例如,给定以下数据集,我想合并每个类别的行并按日期升序排列。

+---------+-----------+------+------+
| category|       date|  val1|  val2|
+---------+-----------+------+------+
|        A| 2020-05-01|  null|     1|
|        A| 2020-05-02|     2|  null|
|        A| 2020-05-03|     3|  null|
|        B| 2020-05-01|  null|  null|
|        B| 2020-05-02|     4|  null|
|        C| 2020-05-01|     5|     2|
|        C| 2020-05-02|  null|     3|
|        D| 2020-05-01|  null|     4|
+---------+-----------+------+------+
Run Code Online (Sandbox Code Playgroud)

我应该得到的输出是......

+---------+-----------+------+------+
| category|       date|  val1|  val2|
+---------+-----------+------+------+
|        A| 2020-05-01|     2|     1|
|        B| 2020-05-01|     4|  null|
|        C| 2020-05-01|     5|     2|
|        D| 2020-05-01|  null|     4|
+---------+-----------+------+------+
Run Code Online (Sandbox Code Playgroud)

Dav*_*hao 5

首先我来给出答案。然后,我会指出重要的部分。

from pyspark.sql import Window
from pyspark.sql.functions import col, dense_rank, first

df = ...  # dataframe from question description

window = (
    Window
    .partitionBy("category")
    .orderBy(col("date").asc())
)
window_unbounded = (
    window
    .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)

cols_to_merge = [col for col in df.columns if col not in ["category", "date"]]
merged_cols = [first(col, True).over(window_unbounded).alias(col) for col in cols_to_merge]
df_merged = (
    df
    .select([col("category"), col("date")] + merged_cols)
    .withColumn("rank_col", dense_rank().over(window))
    .filter(col("rank_col") == 1)
    .drop("rank_col")
)
Run Code Online (Sandbox Code Playgroud)

与coalesce类似的按行是聚合函数first。具体来说,我们使用first withignorenulls = True来找到第一个非空值。

当我们使用first时,我们必须小心它所应用到的行的顺序。因为groupBy不允许我们维护组内的顺序,所以我们使用Window

窗口本身必须在两端都是无界的,而不是当前 row 之前的默认无界,否则我们最终first可能会在组的子集上运行聚合。

在窗口上聚合后,我们将列别名回其原始名称,以保持列名称一致。

我们使用cols的单个select语句而不是 for 循环 with,因为 select 语句大大减少了查询计划深度。如果您使用循环的withColumn,如果列太多,您可能会遇到堆栈溢出错误。df.withColumn(col, ...)

最后,我们对dense_rank窗口运行一次——这次使用具有默认范围的窗口——并仅过滤排名第一的行。我们在这里使用密集排名,但我们可以使用任何排名函数,只要适合我们的需要即可。