在 Spark 中使用 Windows 函数进行每周聚合

Utk*_*raf 3 scala dataframe apache-spark spark-graphx

我有从 2017 年 1 月 1 日到 2017 年 1 月 7 日的数据,这是每周想要的一周汇总数据。我以下列方式使用窗口函数

val df_v_3 = df_v_2.groupBy(window(col("DateTime"), "7 day"))
      .agg(sum("Value") as "aggregate_sum")
      .select("window.start", "window.end", "aggregate_sum")
Run Code Online (Sandbox Code Playgroud)

我在数据框中有数据作为

    DateTime,value
    2017-01-01T00:00:00.000+05:30,1.2
    2017-01-01T00:15:00.000+05:30,1.30
--
    2017-01-07T23:30:00.000+05:30,1.43
    2017-01-07T23:45:00.000+05:30,1.4
Run Code Online (Sandbox Code Playgroud)

我得到的输出为:

2016-12-29T05:30:00.000+05:30,2017-01-05T05:30:00.000+05:30,723.87
2017-01-05T05:30:00.000+05:30,2017-01-12T05:30:00.000+05:30,616.74
Run Code Online (Sandbox Code Playgroud)

它显示我的一天是从 2016 年 12 月 29 日开始,但实际数据是从 2017 年 1 月 1 日开始,为什么会出现这种保证金?

Sha*_*ica 5

对于像这样的滚动窗口,可以设置开始时间的偏移量,更多信息可以在博客中找到。使用滑动窗口,但是,通过将“窗口持续时间”和“滑动持续时间”设置为相同的值,它将与具有起始偏移量的滚动窗口相同。

语法如下,

window(column, window duration, sliding duration, starting offset)
Run Code Online (Sandbox Code Playgroud)

根据您的值,我发现 64 小时的偏移量将给出2017-01-01 00:00:00.

val data = Seq(("2017-01-01 00:00:00",1.0),
               ("2017-01-01 00:15:00",2.0),
               ("2017-01-08 23:30:00",1.43))
val df = data.toDF("DateTime","value")
  .withColumn("DateTime", to_timestamp($"DateTime", "yyyy-MM-dd HH:mm:ss"))

val df2 = df
  .groupBy(window(col("DateTime"), "1 week", "1 week", "64 hours"))
  .agg(sum("value") as "aggregate_sum")
  .select("window.start", "window.end", "aggregate_sum")
Run Code Online (Sandbox Code Playgroud)

将给出这个结果数据框:

+-------------------+-------------------+-------------+
|              start|                end|aggregate_sum|
+-------------------+-------------------+-------------+
|2017-01-01 00:00:00|2017-01-08 00:00:00|          3.0|
|2017-01-08 00:00:00|2017-01-15 00:00:00|         1.43|
+-------------------+-------------------+-------------+
Run Code Online (Sandbox Code Playgroud)