检查日期是否有超过一天的间隔,如果在 Spark 中连续则将其分组

lun*_*box 3 python dataframe pandas apache-spark pyspark

如果我有一个日期格式为 MM/DD/YYYY 的表格,如下所示。

+---+-----------+----------+
| id|  startdate|   enddate|
+---+-----------+----------+
|  1| 01/01/2022|01/31/2022|  
|  1| 02/01/2022|02/28/2022|  
|  1| 03/01/2022|03/31/2022|  
|  2| 01/01/2022|03/01/2022|  
|  2| 03/05/2022|03/31/2022| 
|  2| 04/01/2022|04/05/2022|
+---+-----------+----------+
Run Code Online (Sandbox Code Playgroud)

如何根据 id 列进行分组以及开始日期和结束日期是否连续?

一件事是,如果间隔超过一天,则将该行保留在新行上,这样上表将变为:

+---+-----------+----------+
| id|  startdate|   enddate|
+---+-----------+----------+
|  1| 01/01/2022|31/03/2022|  
|  2| 01/01/2022|03/01/2022|  
|  2| 03/05/2022|04/05/2022|  
+---+-----------+----------+
Run Code Online (Sandbox Code Playgroud)

id = 1 成为一行,因为 id =1 的所有日期都是连续的,即没有间隙 > 1,但 id 2 有两行,因为 03/01/2022 和 03/05/2022 之间有间隙。

vin*_*sce 5

这是会话化问题的一个特殊情况(即根据某些条件识别数据中的会话)。

这是使用 Windows 的可能解决方案。解决方案背后的逻辑:

  1. 在每一行将时间上的前enddate一个与相同的关联起来id
  2. startdate计算每次与前一次之间的天数差异enddate
  3. 识别没有前一行或距前一行至少两天的所有行
  4. 在每一行关联 a session_index,即截至该行看到的新会话数
  5. 聚合分组依据idsession_index
w = Window.partitionBy("id")\
          .orderBy("startdate")

df = df \
    .select(
        F.col("id"),
        F.to_date("startdate", "MM/dd/yyyy").alias("startdate"),
        F.to_date("enddate", "MM/dd/yyyy").alias("enddate")
    ) \
    .withColumn("previous_enddate", F.lag('enddate', offset=1).over(w)) \
    .withColumn("date_diff", F.datediff(F.col("startdate"), F.col("previous_enddate"))) \
    .withColumn("is_new_session", F.col("date_diff").isNull() | (F.col("date_diff") > 1)) \
    .withColumn("session_index", F.sum(F.col("is_new_session").cast("int")).over(w))

df.groupBy("id", "session_index") \
    .agg(
        F.min("startdate").alias("startdate"),
        F.max("enddate").alias("enddate")
    ) \
    .drop("session_index")
Run Code Online (Sandbox Code Playgroud)