我可以获得有关如何在 pyspark 中编写此逻辑的帮助吗?
假设我有如附图所示的表格。
因此,给定:日期、用户 ID、访问、分组(旧)作为输入,我想创建一个名为分组(新)的新列,以便执行以下操作:
对于任何给定的用户 ID:
首先检查一下分组是什么(旧)。如果是!= Bad,则分组(新)=分组(旧)
如果分组(旧)= Bad,则应用最近日期的最近访问的分组(旧),使其 != Bad
但是,如果距离上一个日期 != Bad 的最近分组(旧)已经超过 30 天,则使分组(新)= Bad(因为数据已过时)
我尝试过但没有按预期工作:
days = lambda i: i * 86400
user_30d_tracker =
Window.partitionBy("userid")
.orderBy(f.col("date").cast("timestamp").cast("long"))
.rangeBetween(-days(30), 0)
.rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
df = (df.withColumn("Grouping(old)_YN",
f.when(f.col("Grouping(old)")==f.lit("Bad"), "No")
.otherwise(f.lit("Yes"))))
df = df.withColumn("Grouping_new",
f.max(f.when(f.col("Grouping(old)_YN") == f.lit("Yes"),
f.col("Grouping(old)"))).over(user_30d_tracker))
Run Code Online (Sandbox Code Playgroud)