如何在时间段限制和其他条件下使用 PySpark 函数的窗口

Jan*_*ice 5 apache-spark apache-spark-sql pyspark

我可以获得有关如何在 pyspark 中编写此逻辑的帮助吗?

假设我有如附图所示的表格。

因此,给定:日期、用户 ID、访问、分组(旧)作为输入,我想创建一个名为分组(新)的新列,以便执行以下操作:

对于任何给定的用户 ID:

  1. 首先检查一下分组是什么(旧)。如果是!= Bad,则分组(新)=分组(旧)

  2. 如果分组(旧)= Bad,则应用最近日期的最近访问的分组(旧),使其 != Bad

  3. 但是,如果距离上一个日期 != Bad 的最近分组(旧)已经超过 30 天,则使分组(新)= Bad(因为数据已过时)

在此输入图像描述

我尝试过但没有按预期工作:

days = lambda i: i * 86400 

user_30d_tracker = 
  Window.partitionBy("userid")
    .orderBy(f.col("date").cast("timestamp").cast("long"))
    .rangeBetween(-days(30), 0)
    .rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)


df = (df.withColumn("Grouping(old)_YN",
      f.when(f.col("Grouping(old)")==f.lit("Bad"), "No")
       .otherwise(f.lit("Yes"))))

df = df.withColumn("Grouping_new", 
          f.max(f.when(f.col("Grouping(old)_YN") == f.lit("Yes"), 
          f.col("Grouping(old)"))).over(user_30d_tracker))

Run Code Online (Sandbox Code Playgroud)

Adi*_*ibP 5

假设这就是您的数据框的样子

import pyspark.sql.functions as f
from pyspark.sql.window import Window

data = [[123, "20200101", 1, "Good_sub1"],
        [123, "20200101", 2, "Bad"],
        [123, "20200115", 1, "Bad"],
        [123, "20200115", 2, "Bad"],
        [123, "20200116", 1, "Good_sub2"],
        [123, "20200116", 2, "Bad"],
        [123, "20200116", 3, "Good_sub3"],
        [123, "20220901", 1, "Bad"]]

df = spark.createDataFrame(data, 
                           "userid:int, date:string, visit:int, `grouping(old)`:string")
df.show()
# +------+--------+-----+-------------+
# |userid|    date|visit|grouping(old)|
# +------+--------+-----+-------------+
# |   123|20200101|    1|    Good_sub1|
# |   123|20200101|    2|          Bad|
# |   123|20200115|    1|          Bad|
# |   123|20200115|    2|          Bad|
# |   123|20200116|    1|    Good_sub2|
# |   123|20200116|    2|          Bad|
# |   123|20200116|    3|    Good_sub3|
# |   123|20220901|    1|          Bad|
# +------+--------+-----+-------------+

days = lambda i: i * 86400 

user_30d_tracker = Window.partitionBy("userid")\
                         .orderBy(f.col("date").cast("timestamp").cast("long"))\
                         .rangeBetween(-days(30), 0)\
                         .rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
Run Code Online (Sandbox Code Playgroud)

我们先看一下窗户user_30d_tracker。考虑到以下几点,该窗口需要进行一些更改

  1. 窗口的顺序似乎需要 的visitunix 时间戳旁边的列值datedate例如,如果子句中只有时间戳orderBy,spark 将无法保证访问 = 1 的行将出现在访问 = 2 之前。因此,我们需要在子句visit中包含列orderBy
  2. 附加帧rowsBetween将覆盖该rangeBetween帧,因此它将与预期不同。

一个选项是仅使用rangeBetween窗口框架。但是,由于rangeBetween框架只允许在表达式中使用 1 列,因此我们可以通过向 的 unix 时间戳orderBy添加值来使用解决方法(这就像视为自启动以来已经过去了多少秒)。visitdatevisitdate

user_30d_tracker =  Window\
                        .partitionBy("userid")\
                        .orderBy(f.unix_timestamp("date", "yyyyMMdd") + f.col("visit"))\
                        .rangeBetween(-days(30), 0)
Run Code Online (Sandbox Code Playgroud)

然后,要获取最新的非“坏”分组(旧)值,最好使用last函数(带有ignorenulls=True)而不是max因为它采用窗口中的最新值,而不是排序字符串的最大值。之后,用于coalesce填充新列中的空值。

df = (df
      .withColumn("Grouping(old)_YN",
                  f.when(f.col("Grouping(old)") == f.lit("Bad"), "No")
                  .otherwise(f.lit("Yes")))
      .withColumn("Grouping_new", 
                  f.last(f.when(f.col("Grouping(old)_YN") == f.lit("Yes"), 
                                f.col("Grouping(old)")), ignorenulls=True).over(user_30d_tracker))
      .withColumn("Grouping_new", f.coalesce(f.col("Grouping_new"), f.col("Grouping(old)")))
      )
df.show()

# +------+--------+-----+-------------+----------------+------------+
# |userid|    date|visit|grouping(old)|Grouping(old)_YN|Grouping_new|
# +------+--------+-----+-------------+----------------+------------+
# |   123|20200101|    1|    Good_sub1|             Yes|   Good_sub1|
# |   123|20200101|    2|          Bad|              No|   Good_sub1|
# |   123|20200115|    1|          Bad|              No|   Good_sub1|
# |   123|20200115|    2|          Bad|              No|   Good_sub1|
# |   123|20200116|    1|    Good_sub2|             Yes|   Good_sub2|
# |   123|20200116|    2|          Bad|              No|   Good_sub2|
# |   123|20200116|    3|    Good_sub3|             Yes|   Good_sub3|
# |   123|20220901|    1|          Bad|              No|         Bad|
# +------+--------+-----+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)