Utk*_*raf 3 scala dataframe apache-spark spark-graphx
我有从 2017 年 1 月 1 日到 2017 年 1 月 7 日的数据,这是每周想要的一周汇总数据。我以下列方式使用窗口函数
val df_v_3 = df_v_2.groupBy(window(col("DateTime"), "7 day"))
.agg(sum("Value") as "aggregate_sum")
.select("window.start", "window.end", "aggregate_sum")
Run Code Online (Sandbox Code Playgroud)
我在数据框中有数据作为
DateTime,value
2017-01-01T00:00:00.000+05:30,1.2
2017-01-01T00:15:00.000+05:30,1.30
--
2017-01-07T23:30:00.000+05:30,1.43
2017-01-07T23:45:00.000+05:30,1.4
Run Code Online (Sandbox Code Playgroud)
我得到的输出为:
2016-12-29T05:30:00.000+05:30,2017-01-05T05:30:00.000+05:30,723.87
2017-01-05T05:30:00.000+05:30,2017-01-12T05:30:00.000+05:30,616.74
Run Code Online (Sandbox Code Playgroud)
它显示我的一天是从 2016 年 12 月 29 日开始,但实际数据是从 2017 年 1 月 1 日开始,为什么会出现这种保证金?
对于像这样的滚动窗口,可以设置开始时间的偏移量,更多信息可以在博客中找到。使用滑动窗口,但是,通过将“窗口持续时间”和“滑动持续时间”设置为相同的值,它将与具有起始偏移量的滚动窗口相同。
语法如下,
window(column, window duration, sliding duration, starting offset)
Run Code Online (Sandbox Code Playgroud)
根据您的值,我发现 64 小时的偏移量将给出2017-01-01 00:00:00.
val data = Seq(("2017-01-01 00:00:00",1.0),
("2017-01-01 00:15:00",2.0),
("2017-01-08 23:30:00",1.43))
val df = data.toDF("DateTime","value")
.withColumn("DateTime", to_timestamp($"DateTime", "yyyy-MM-dd HH:mm:ss"))
val df2 = df
.groupBy(window(col("DateTime"), "1 week", "1 week", "64 hours"))
.agg(sum("value") as "aggregate_sum")
.select("window.start", "window.end", "aggregate_sum")
Run Code Online (Sandbox Code Playgroud)
将给出这个结果数据框:
+-------------------+-------------------+-------------+
| start| end|aggregate_sum|
+-------------------+-------------------+-------------+
|2017-01-01 00:00:00|2017-01-08 00:00:00| 3.0|
|2017-01-08 00:00:00|2017-01-15 00:00:00| 1.43|
+-------------------+-------------------+-------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4034 次 |
| 最近记录: |