小编Dea*_*ock的帖子

如何在pyspark中的流式查询中生成会话窗口?

以下代码按预期对批处理工作,但在使用臭名昭著的流查询时失败AnalysisException

流式数据帧/数据集不支持非基于时间的窗口

from pyspark.sql.functions import *
from pyspark.sql.window import Window

temp = [
  ('Alice', 1),
  ('Alice', 60),
  ('Alice', 160),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1112),
  ('Bob', 3),
  ('Alice', 2),
  ('Bob', 2),
  ('Alice', 3),
  ('Bob', 1)
]

temp_df = spark.createDataFrame(temp, ["user", "ev_timestamp"])

maxSessionDuration = 60 * 10 # Max session duration of 10 minutes.
client_fp_time_window = Window.partitionBy("user").orderBy("ev_timestamp")

rowsWithSessionIds = temp_df \
    .select("user", "ev_timestamp", lag("ev_timestamp", 1).over(client_fp_time_window).alias("prevEvTimestamp")) \
    .select("user", "ev_timestamp", 
            when(
              (col("ev_timestamp").cast('long') - …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark databricks spark-structured-streaming azure-databricks

5
推荐指数
1
解决办法
788
查看次数