eza*_*mur 5 window-functions apache-spark apache-spark-sql
使用两个预定义边界在 Spark SQL 中指定窗口间隔的正确方法是什么?
我试图在“3 小时前到 2 小时前”的窗口中总结我的表中的值。
当我运行此查询时:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row
) as sum_value
from my_temp_table;
Run Code Online (Sandbox Code Playgroud)
那个有效。我得到了我期望的结果,即落入 2 小时滚动窗口的值的总和。
现在,我需要的是让滚动窗口不绑定到当前行,而是考虑 3 小时前和 2 小时前之间的行。我试过:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 hours preceding
) as sum_value
from my_temp_table;
Run Code Online (Sandbox Code Playgroud)
但我得到extraneous input 'hours' expecting {'PRECEDING', 'FOLLOWING'}错误。
我也试过:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and interval 2 hours preceding
) as sum_value
from my_temp_table;
Run Code Online (Sandbox Code Playgroud)
但后来我得到了不同的错误 scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)
我尝试的第三个选项是:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 preceding
) as sum_value
from my_temp_table;
Run Code Online (Sandbox Code Playgroud)
它不像我们预期的那样工作: cannot resolve 'RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING' due to data type mismatch
我很难找到区间类型的文档,因为这个链接说的不够多,其他信息有点不成熟。至少我发现了什么。
由于范围间隔不起作用,我不得不转向另一种方法。事情是这样的:
就我而言,我必须对一天中的每个小时进行计算,并将这些“每小时”结果(即 24 个数据帧的列表)合并到一个“每日”数据帧中。
从非常高层次的角度来看,代码如下所示:
val hourlyDFs = for ((hourStart, hourEnd) <- (hoursToStart, hoursToEnd).zipped) yield {
val data = data.where($"hour" <= lit(hourEnd) && $"hour" >= lit(hourStart))
// do stuff
// return a data frame
}
hourlyDFs.toSeq().reduce(_.union(_))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2142 次 |
| 最近记录: |