Dea*_*ock 5 apache-spark pyspark databricks spark-structured-streaming azure-databricks
以下代码按预期对批处理工作,但在使用臭名昭著的流查询时失败AnalysisException:
流式数据帧/数据集不支持非基于时间的窗口
from pyspark.sql.functions import *
from pyspark.sql.window import Window
temp = [
('Alice', 1),
('Alice', 60),
('Alice', 160),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1112),
('Bob', 3),
('Alice', 2),
('Bob', 2),
('Alice', 3),
('Bob', 1)
]
temp_df = spark.createDataFrame(temp, ["user", "ev_timestamp"])
maxSessionDuration = 60 * 10 # Max session duration of 10 minutes.
client_fp_time_window = Window.partitionBy("user").orderBy("ev_timestamp")
rowsWithSessionIds = temp_df \
.select("user", "ev_timestamp", lag("ev_timestamp", 1).over(client_fp_time_window).alias("prevEvTimestamp")) \
.select("user", "ev_timestamp",
when(
(col("ev_timestamp").cast('long') - col("prevEvTimestamp").cast('long')) < maxSessionDuration, 0) \
.otherwise(1).alias("isNewSession")
) \
.select("user", "ev_timestamp", sum("isNewSession").over(client_fp_time_window).alias("sessionId"))
display(rowsWithSessionIds)
sessionsDF = rowsWithSessionIds \
.groupBy("user", "sessionId") \
.agg(min("ev_timestamp").alias("startTime"), max("ev_timestamp").alias("endTime"), count("*").alias("count")) \
.alias('Session')
display(sessionsDF)
Run Code Online (Sandbox Code Playgroud)
我知道这是因为该lag()功能不支持流式查询。所以推荐的替代方法是采用这种mapGroupsWithState()方法,但这仅限于 Scala/Java。
如何在 Pyspark 中实现这一点?或者还有哪些其他替代方案可用于 Pyspark 的结构化会话?
每批所需的输出如下所示:
user sessionId startTime endTime count
Bob 1 1 3 3
Alice 1 1 160 5
Alice 2 1111 1112 7
Run Code Online (Sandbox Code Playgroud)
小智 0
由于spark >= 3.2.0,F.session_window可用并且它可以用于流式处理和批处理。输出与您的略有不同,而不是sessionId我们得到的session_window。
from pyspark.sql import functions as F
temp = [
('Alice', 1),
('Alice', 60),
('Alice', 160),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1112),
('Bob', 3),
('Alice', 2),
('Bob', 2),
('Alice', 3),
('Bob', 1)
]
temp_df = spark.createDataFrame(temp, ['user', 'ev_timestamp_sec'])
temp_df = temp_df.withColumn(
'ev_timestamp', F.timestamp_seconds('ev_timestamp_sec')
)
# For Structured Streaming, we have to set a watermark
temp_df = temp_df.withWatermark('ev_timestamp', '5 minutes')
sess_window = F.session_window(
timeColumn='ev_timestamp', gapDuration='10 minutes'
)
agg_cols = [
F.min('ev_timestamp').alias('startTime'),
F.max('ev_timestamp').alias('endTime'),
F.count('*').alias('count')
]
sessions_df = temp_df.groupBy('user', sess_window).agg(*agg_cols)
sessions_df.show(3, False)
# +-----+------------------------------------------+-------------------+-------------------+-----+
# |user |session_window |startTime |endTime |count|
# +-----+------------------------------------------+-------------------+-------------------+-----+
# |Alice|{1970-01-01 00:00:01, 1970-01-01 00:12:40}|1970-01-01 00:00:01|1970-01-01 00:02:40|5 |
# |Alice|{1970-01-01 00:18:31, 1970-01-01 00:28:32}|1970-01-01 00:18:31|1970-01-01 00:18:32|7 |
# |Bob |{1970-01-01 00:00:01, 1970-01-01 00:10:03}|1970-01-01 00:00:01|1970-01-01 00:00:03|3 |
# +-----+------------------------------------------+-------------------+-------------------+-----+
Run Code Online (Sandbox Code Playgroud)