合并重叠日期的记录

Aru*_*run 4 apache-spark apache-spark-sql pyspark

我在 PySpark 数据框中有重叠日期的要求合并记录。 MIN开始日期和MAX结束数据将是重叠记录的开始和结束日期。

样本记录:

输入数据

Item Code          Item name     Start_date       End_date
==============     =========     ===========      ===========
111                Item1        15-May-2004      20-Jun-2004
111                Item1        22-May-2004      07-Jun-2004
111                Item1        20-Jun-2004      13-Aug-2004
111                Item1        27-May-2004      30-Aug-2004
111                Item1        02-Sep-2004      23-Dec-2004
222                Item2       21-May-2004      19-Aug-2004 
Run Code Online (Sandbox Code Playgroud)

输出应该像

Item Code         Item name      Start_date       End_date
==============    =========      ===========      ===========
111               Item1          15-May-2004      30-Aug-2004
111               Item1          02-Sep-2004      23-Dec-2004
222               Item2          21-May-2004      19-Aug-2004 
Run Code Online (Sandbox Code Playgroud)

mck*_*mck 6

You can check for overlaps by getting the latest End_date in the previous rows, group the rows using the rolling sum of the overlap criterion, and aggregate the earliest and latest dates.

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'Start_date', 
    F.to_date('Start_date', 'dd-MMM-yyyy')
).withColumn(
    'End_date', 
    F.to_date('End_date', 'dd-MMM-yyyy')
).withColumn(
    'last_date', 
    F.max('End_date').over(
        Window.partitionBy('Item Code', 'Item name').orderBy('Start_date').rowsBetween(Window.unboundedPreceding, -1)
    )
).withColumn(
    'group', 
    F.sum(
        F.coalesce(
            F.col('Start_date') >= F.col('last_date'), 
            F.lit(False)
        ).cast('int')
    ).over(
        Window.partitionBy('Item Code', 'Item name').orderBy('Start_date')
    )
).groupBy(
    'Item Code', 'Item name', 'group'
).agg(
    F.date_format(F.min('Start_date'), 'dd-MMM-yyyy').alias('Start_date'), 
    F.date_format(F.max('End_date'), 'dd-MMM-yyyy').alias('End_date')
).drop('group')

df2.show()
+---------+---------+-----------+-----------+
|Item Code|Item name| Start_date|   End_date|
+---------+---------+-----------+-----------+
|      222|    Item2|21-May-2004|19-Aug-2004|
|      111|    Item1|15-May-2004|30-Aug-2004|
|      111|    Item1|02-Sep-2004|23-Dec-2004|
+---------+---------+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)

Behind the scenes before grouping:

+---------+---------+----------+----------+----------+-----+
|Item Code|Item name|Start_date|  End_date| last_date|group|
+---------+---------+----------+----------+----------+-----+
|      222|    Item2|2004-05-21|2004-08-19|      null|    0|
|      111|    Item1|2004-05-15|2004-06-20|      null|    0|
|      111|    Item1|2004-05-22|2004-06-07|2004-06-20|    0|
|      111|    Item1|2004-05-27|2004-08-30|2004-06-20|    0|
|      111|    Item1|2004-06-20|2004-08-13|2004-08-30|    0|
|      111|    Item1|2004-09-02|2004-12-23|2004-08-30|    1|
+---------+---------+----------+----------+----------+-----+
Run Code Online (Sandbox Code Playgroud)