小编Jan*_*ice的帖子

如何在时间段限制和其他条件下使用 PySpark 函数的窗口

我可以获得有关如何在 pyspark 中编写此逻辑的帮助吗?

假设我有如附图所示的表格。

因此,给定:日期、用户 ID、访问、分组(旧)作为输入,我想创建一个名为分组(新)的新列,以便执行以下操作:

对于任何给定的用户 ID:

  1. 首先检查一下分组是什么(旧)。如果是!= Bad,则分组(新)=分组(旧)

  2. 如果分组(旧)= Bad,则应用最近日期的最近访问的分组(旧),使其 != Bad

  3. 但是,如果距离上一个日期 != Bad 的最近分组(旧)已经超过 30 天,则使分组(新)= Bad(因为数据已过时)

在此输入图像描述

我尝试过但没有按预期工作:

days = lambda i: i * 86400 

user_30d_tracker = 
  Window.partitionBy("userid")
    .orderBy(f.col("date").cast("timestamp").cast("long"))
    .rangeBetween(-days(30), 0)
    .rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)


df = (df.withColumn("Grouping(old)_YN",
      f.when(f.col("Grouping(old)")==f.lit("Bad"), "No")
       .otherwise(f.lit("Yes"))))

df = df.withColumn("Grouping_new", 
          f.max(f.when(f.col("Grouping(old)_YN") == f.lit("Yes"), 
          f.col("Grouping(old)"))).over(user_30d_tracker))

Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
1260
查看次数

标签 统计

apache-spark ×1

apache-spark-sql ×1

pyspark ×1