PySpark 在初始新值后的 24 小时窗口内删除重复消息

Cri*_*ber 6 python apache-spark apache-spark-sql pyspark

我有一个带有状态(整数)和时间戳的数据框。由于我收到很多“重复”状态消息,因此我想通过删除“新”状态后 24 小时窗口内重复先前状态的任何行来减少数据帧,这意味着:

  • 第一个 24 小时窗口以特定状态的第一条消息开始。
  • 该状态的下一个 24 小时窗口从第一个 24 小时窗口之后出现的下一条消息开始(窗口不是连续的)。

举个例子:

data = [(10, datetime.datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")),
        (10, datetime.datetime.strptime("2022-01-01 04:00:00", "%Y-%m-%d %H:%M:%S")),
        (10, datetime.datetime.strptime("2022-01-01 23:00:00", "%Y-%m-%d %H:%M:%S")),
        (10, datetime.datetime.strptime("2022-01-02 05:00:00", "%Y-%m-%d %H:%M:%S")),
        (10, datetime.datetime.strptime("2022-01-02 06:00:00", "%Y-%m-%d %H:%M:%S")),

        (20, datetime.datetime.strptime("2022-01-01 03:00:00", "%Y-%m-%d %H:%M:%S"))
      ]

myschema = StructType(
    [
        StructField("status", IntegerType()),
        StructField("ts", TimestampType())
    ]
)
df = spark.createDataFrame(data=data, schema=myschema)
Run Code Online (Sandbox Code Playgroud)
  • 第一个 24 小时状态窗口10是从2022-01-01 00:00:002022-01-02 00:00:00
  • 第二个 24 小时状态窗口10是从2022-01-02 05:00:002022-01-03 05:00:00
  • 第一个 24 小时状态窗口20是从2022-01-01 03:00:002022-01-02 03:00:00

因此,我想保留以下消息:

data = [(10, datetime.datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")),
        (10, datetime.datetime.strptime("2022-01-02 05:00:00", "%Y-%m-%d %H:%M:%S")),

        (20, datetime.datetime.strptime("2022-01-01 03:00:00", "%Y-%m-%d %H:%M:%S"))
      ]
Run Code Online (Sandbox Code Playgroud)

我知道如何在Python中通过循环和跟踪最新更改来做到这一点,我想我需要使用带有partitionBy + orderBy的窗口函数,但我无法弄清楚细节...任何帮助都是值得赞赏的。

M_S*_*M_S 1

这是一个很好的问题,我花了一些时间试图弄清楚它。我有这样的事情:

At first i am calculating time diff in hours from prev element for each row within window
Then i am aggregating above values together as a new column for each row (first row has null, next [4], third [4,19] etc
Then i am using accumulator to calculate cumSum with your condition, so it "resets" after 24h
Then i am using this cumSum to assign "inner window"
Filtering at the end and i am done
Run Code Online (Sandbox Code Playgroud)

可能这不是最好的方法,所以欢迎评论:D

示例代码:

import datetime
from pyspark.sql import Window
import pyspark.sql.functions as F

data = [
    (10, datetime.datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")),
    (10, datetime.datetime.strptime("2022-01-01 04:00:00", "%Y-%m-%d %H:%M:%S")),
    (10, datetime.datetime.strptime("2022-01-01 23:00:00", "%Y-%m-%d %H:%M:%S")),
    (10, datetime.datetime.strptime("2022-01-02 05:00:00", "%Y-%m-%d %H:%M:%S")),
    (10, datetime.datetime.strptime("2022-01-02 06:00:00", "%Y-%m-%d %H:%M:%S")),
    (10, datetime.datetime.strptime("2022-01-02 06:00:00", "%Y-%m-%d %H:%M:%S")),
    (10, datetime.datetime.strptime("2022-01-02 16:00:00", "%Y-%m-%d %H:%M:%S")),
    (10, datetime.datetime.strptime("2022-01-03 16:00:00", "%Y-%m-%d %H:%M:%S")),
    (10, datetime.datetime.strptime("2022-01-03 17:00:00", "%Y-%m-%d %H:%M:%S")),
    (20, datetime.datetime.strptime("2022-01-01 03:00:00", "%Y-%m-%d %H:%M:%S"))
]

myschema = StructType(
    [StructField("status", IntegerType()), StructField("ts", TimestampType())]
)
df = spark.createDataFrame(data=data, schema=myschema)

window = Window.partitionBy("status").orderBy(["ts"])

df = df.withColumn(
        "timeDiffFromPrev",
        (((F.col("ts")).cast("long") - F.lag("ts").over(window).cast("long")) / 3600).cast("long"),
    )

df = df.withColumn('aggTimeDiffFromPrev', F.collect_list('timeDiffFromPrev').over(window))

#Zero aggregate when sum > 24h
expr = "AGGREGATE(aggTimeDiffFromPrev, 0l, (accumulator, element) -> IF(accumulator < 24, accumulator + element, element))"
df = df.select('status', 'ts', F.expr(expr).alias('cumsum'))

#Assign window number based on cumSum - anything greater than 24 is beggining of new window
df = df.withColumn(
        "window_number",
        F.sum(
            F.when((F.col("cumsum") > 24), F.lit(1)).otherwise(F.lit(0))
        ).over(window),
    )

innerWindow = Window.partitionBy("status", "window_number").orderBy(["ts"])

#Keep only first row in window, drop other
df = df.withColumn("row_number", F.row_number().over(innerWindow)).filter(
    F.col("row_number") == F.lit(1)
).drop("timeDiffFromFirst", "window_number", "row_number", "cumSum")

df.show()
Run Code Online (Sandbox Code Playgroud)

我在输入中添加了几行只是为了测试,我的输入数据集结果如下:

+------+-------------------+
|status|                 ts|
+------+-------------------+
|    10|2022-01-01 00:00:00|
|    10|2022-01-02 05:00:00|
|    10|2022-01-03 16:00:00|
|    20|2022-01-01 03:00:00|
+------+-------------------+
Run Code Online (Sandbox Code Playgroud)