Cri*_*ber 6 python apache-spark apache-spark-sql pyspark
我有一个带有状态(整数)和时间戳的数据框。由于我收到很多“重复”状态消息,因此我想通过删除“新”状态后 24 小时窗口内重复先前状态的任何行来减少数据帧,这意味着:
举个例子:
data = [(10, datetime.datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-01 04:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-01 23:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-02 05:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-02 06:00:00", "%Y-%m-%d %H:%M:%S")),
(20, datetime.datetime.strptime("2022-01-01 03:00:00", "%Y-%m-%d %H:%M:%S"))
]
myschema = StructType(
[
StructField("status", IntegerType()),
StructField("ts", TimestampType())
]
)
df = spark.createDataFrame(data=data, schema=myschema)
Run Code Online (Sandbox Code Playgroud)
10是从2022-01-01 00:00:00到2022-01-02 00:00:00。10是从2022-01-02 05:00:00到2022-01-03 05:00:00。20是从2022-01-01 03:00:00到2022-01-02 03:00:00。因此,我想保留以下消息:
data = [(10, datetime.datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-02 05:00:00", "%Y-%m-%d %H:%M:%S")),
(20, datetime.datetime.strptime("2022-01-01 03:00:00", "%Y-%m-%d %H:%M:%S"))
]
Run Code Online (Sandbox Code Playgroud)
我知道如何在Python中通过循环和跟踪最新更改来做到这一点,我想我需要使用带有partitionBy + orderBy的窗口函数,但我无法弄清楚细节...任何帮助都是值得赞赏的。
这是一个很好的问题,我花了一些时间试图弄清楚它。我有这样的事情:
At first i am calculating time diff in hours from prev element for each row within window
Then i am aggregating above values together as a new column for each row (first row has null, next [4], third [4,19] etc
Then i am using accumulator to calculate cumSum with your condition, so it "resets" after 24h
Then i am using this cumSum to assign "inner window"
Filtering at the end and i am done
Run Code Online (Sandbox Code Playgroud)
可能这不是最好的方法,所以欢迎评论:D
示例代码:
import datetime
from pyspark.sql import Window
import pyspark.sql.functions as F
data = [
(10, datetime.datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-01 04:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-01 23:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-02 05:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-02 06:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-02 06:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-02 16:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-03 16:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-03 17:00:00", "%Y-%m-%d %H:%M:%S")),
(20, datetime.datetime.strptime("2022-01-01 03:00:00", "%Y-%m-%d %H:%M:%S"))
]
myschema = StructType(
[StructField("status", IntegerType()), StructField("ts", TimestampType())]
)
df = spark.createDataFrame(data=data, schema=myschema)
window = Window.partitionBy("status").orderBy(["ts"])
df = df.withColumn(
"timeDiffFromPrev",
(((F.col("ts")).cast("long") - F.lag("ts").over(window).cast("long")) / 3600).cast("long"),
)
df = df.withColumn('aggTimeDiffFromPrev', F.collect_list('timeDiffFromPrev').over(window))
#Zero aggregate when sum > 24h
expr = "AGGREGATE(aggTimeDiffFromPrev, 0l, (accumulator, element) -> IF(accumulator < 24, accumulator + element, element))"
df = df.select('status', 'ts', F.expr(expr).alias('cumsum'))
#Assign window number based on cumSum - anything greater than 24 is beggining of new window
df = df.withColumn(
"window_number",
F.sum(
F.when((F.col("cumsum") > 24), F.lit(1)).otherwise(F.lit(0))
).over(window),
)
innerWindow = Window.partitionBy("status", "window_number").orderBy(["ts"])
#Keep only first row in window, drop other
df = df.withColumn("row_number", F.row_number().over(innerWindow)).filter(
F.col("row_number") == F.lit(1)
).drop("timeDiffFromFirst", "window_number", "row_number", "cumSum")
df.show()
Run Code Online (Sandbox Code Playgroud)
我在输入中添加了几行只是为了测试,我的输入数据集结果如下:
+------+-------------------+
|status| ts|
+------+-------------------+
| 10|2022-01-01 00:00:00|
| 10|2022-01-02 05:00:00|
| 10|2022-01-03 16:00:00|
| 20|2022-01-01 03:00:00|
+------+-------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
201 次 |
| 最近记录: |