lun*_*box 3 python dataframe pandas apache-spark pyspark
如果我有一个日期格式为 MM/DD/YYYY 的表格,如下所示。
+---+-----------+----------+
| id| startdate| enddate|
+---+-----------+----------+
| 1| 01/01/2022|01/31/2022|
| 1| 02/01/2022|02/28/2022|
| 1| 03/01/2022|03/31/2022|
| 2| 01/01/2022|03/01/2022|
| 2| 03/05/2022|03/31/2022|
| 2| 04/01/2022|04/05/2022|
+---+-----------+----------+
Run Code Online (Sandbox Code Playgroud)
如何根据 id 列进行分组以及开始日期和结束日期是否连续?
一件事是,如果间隔超过一天,则将该行保留在新行上,这样上表将变为:
+---+-----------+----------+
| id| startdate| enddate|
+---+-----------+----------+
| 1| 01/01/2022|31/03/2022|
| 2| 01/01/2022|03/01/2022|
| 2| 03/05/2022|04/05/2022|
+---+-----------+----------+
Run Code Online (Sandbox Code Playgroud)
id = 1 成为一行,因为 id =1 的所有日期都是连续的,即没有间隙 > 1,但 id 2 有两行,因为 03/01/2022 和 03/05/2022 之间有间隙。
这是会话化问题的一个特殊情况(即根据某些条件识别数据中的会话)。
这是使用 Windows 的可能解决方案。解决方案背后的逻辑:
enddate
一个与相同的关联起来id
startdate
计算每次与前一次之间的天数差异enddate
session_index
,即截至该行看到的新会话数id
和session_index
w = Window.partitionBy("id")\
.orderBy("startdate")
df = df \
.select(
F.col("id"),
F.to_date("startdate", "MM/dd/yyyy").alias("startdate"),
F.to_date("enddate", "MM/dd/yyyy").alias("enddate")
) \
.withColumn("previous_enddate", F.lag('enddate', offset=1).over(w)) \
.withColumn("date_diff", F.datediff(F.col("startdate"), F.col("previous_enddate"))) \
.withColumn("is_new_session", F.col("date_diff").isNull() | (F.col("date_diff") > 1)) \
.withColumn("session_index", F.sum(F.col("is_new_session").cast("int")).over(w))
df.groupBy("id", "session_index") \
.agg(
F.min("startdate").alias("startdate"),
F.max("enddate").alias("enddate")
) \
.drop("session_index")
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
134 次 |
最近记录: |