Jan*_*ice 5 apache-spark apache-spark-sql pyspark
我可以获得有关如何在 pyspark 中编写此逻辑的帮助吗?
假设我有如附图所示的表格。
因此,给定:日期、用户 ID、访问、分组(旧)作为输入,我想创建一个名为分组(新)的新列,以便执行以下操作:
对于任何给定的用户 ID:
首先检查一下分组是什么(旧)。如果是!= Bad,则分组(新)=分组(旧)
如果分组(旧)= Bad,则应用最近日期的最近访问的分组(旧),使其 != Bad
但是,如果距离上一个日期 != Bad 的最近分组(旧)已经超过 30 天,则使分组(新)= Bad(因为数据已过时)
我尝试过但没有按预期工作:
days = lambda i: i * 86400
user_30d_tracker =
Window.partitionBy("userid")
.orderBy(f.col("date").cast("timestamp").cast("long"))
.rangeBetween(-days(30), 0)
.rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
df = (df.withColumn("Grouping(old)_YN",
f.when(f.col("Grouping(old)")==f.lit("Bad"), "No")
.otherwise(f.lit("Yes"))))
df = df.withColumn("Grouping_new",
f.max(f.when(f.col("Grouping(old)_YN") == f.lit("Yes"),
f.col("Grouping(old)"))).over(user_30d_tracker))
Run Code Online (Sandbox Code Playgroud)
假设这就是您的数据框的样子
import pyspark.sql.functions as f
from pyspark.sql.window import Window
data = [[123, "20200101", 1, "Good_sub1"],
[123, "20200101", 2, "Bad"],
[123, "20200115", 1, "Bad"],
[123, "20200115", 2, "Bad"],
[123, "20200116", 1, "Good_sub2"],
[123, "20200116", 2, "Bad"],
[123, "20200116", 3, "Good_sub3"],
[123, "20220901", 1, "Bad"]]
df = spark.createDataFrame(data,
"userid:int, date:string, visit:int, `grouping(old)`:string")
df.show()
# +------+--------+-----+-------------+
# |userid| date|visit|grouping(old)|
# +------+--------+-----+-------------+
# | 123|20200101| 1| Good_sub1|
# | 123|20200101| 2| Bad|
# | 123|20200115| 1| Bad|
# | 123|20200115| 2| Bad|
# | 123|20200116| 1| Good_sub2|
# | 123|20200116| 2| Bad|
# | 123|20200116| 3| Good_sub3|
# | 123|20220901| 1| Bad|
# +------+--------+-----+-------------+
days = lambda i: i * 86400
user_30d_tracker = Window.partitionBy("userid")\
.orderBy(f.col("date").cast("timestamp").cast("long"))\
.rangeBetween(-days(30), 0)\
.rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
Run Code Online (Sandbox Code Playgroud)
我们先看一下窗户user_30d_tracker。考虑到以下几点,该窗口需要进行一些更改
visitunix 时间戳旁边的列值date。date例如,如果子句中只有时间戳orderBy,spark 将无法保证访问 = 1 的行将出现在访问 = 2 之前。因此,我们需要在子句visit中包含列orderBy。rowsBetween将覆盖该rangeBetween帧,因此它将与预期不同。一个选项是仅使用rangeBetween窗口框架。但是,由于rangeBetween框架只允许在表达式中使用 1 列,因此我们可以通过向 的 unix 时间戳orderBy添加值来使用解决方法(这就像视为自启动以来已经过去了多少秒)。visitdatevisitdate
user_30d_tracker = Window\
.partitionBy("userid")\
.orderBy(f.unix_timestamp("date", "yyyyMMdd") + f.col("visit"))\
.rangeBetween(-days(30), 0)
Run Code Online (Sandbox Code Playgroud)
然后,要获取最新的非“坏”分组(旧)值,最好使用last函数(带有ignorenulls=True)而不是max因为它采用窗口中的最新值,而不是排序字符串的最大值。之后,用于coalesce填充新列中的空值。
df = (df
.withColumn("Grouping(old)_YN",
f.when(f.col("Grouping(old)") == f.lit("Bad"), "No")
.otherwise(f.lit("Yes")))
.withColumn("Grouping_new",
f.last(f.when(f.col("Grouping(old)_YN") == f.lit("Yes"),
f.col("Grouping(old)")), ignorenulls=True).over(user_30d_tracker))
.withColumn("Grouping_new", f.coalesce(f.col("Grouping_new"), f.col("Grouping(old)")))
)
df.show()
# +------+--------+-----+-------------+----------------+------------+
# |userid| date|visit|grouping(old)|Grouping(old)_YN|Grouping_new|
# +------+--------+-----+-------------+----------------+------------+
# | 123|20200101| 1| Good_sub1| Yes| Good_sub1|
# | 123|20200101| 2| Bad| No| Good_sub1|
# | 123|20200115| 1| Bad| No| Good_sub1|
# | 123|20200115| 2| Bad| No| Good_sub1|
# | 123|20200116| 1| Good_sub2| Yes| Good_sub2|
# | 123|20200116| 2| Bad| No| Good_sub2|
# | 123|20200116| 3| Good_sub3| Yes| Good_sub3|
# | 123|20220901| 1| Bad| No| Bad|
# +------+--------+-----+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1260 次 |
| 最近记录: |