pyspark.sql.functions.window函数的startTime参数有什么作用?

Zha*_*ong 5 dataframe apache-spark apache-spark-sql pyspark

在官方文档中,只有一个简单的示例:

startTime是相对于1970-01-01 00:00:00 UTC的偏移量,使用该偏移量可以启动窗口间隔。例如,为了使每小时的滚动窗口从每小时的15分钟开始,例如12:15-13:15、13:15-14:15 ...提供startTime15 minutes

但是我想知道它如何与所有参数一起工作。

例如:

ts_list = map(lambda x: datetime.datetime(2017, 1, 9, 9, 0, 10) + datetime.timedelta(seconds=x), range(30))
rdd = spark.sparkContext.parallelize(ts_list).map(lambda x: (x, 1))
df = spark.createDataFrame(rdd, schema=['dt', 'val'])
win = df.groupBy(window("dt", "5 seconds", '4 seconds', '3 seconds')).agg(sum("val").alias("sum"))
pprint.pprint(win.select(win['window']['start'].cast('string').alias('start'),
                         win['window']['end'].cast('string').alias('end')).collect())
Run Code Online (Sandbox Code Playgroud)

输出:

[Row(start=u'2017-01-09 09:00:19', end=u'2017-01-09 09:00:24'),                 
 Row(start=u'2017-01-09 09:00:35', end=u'2017-01-09 09:00:40'),
 Row(start=u'2017-01-09 09:00:27', end=u'2017-01-09 09:00:32'),
 Row(start=u'2017-01-09 09:00:07', end=u'2017-01-09 09:00:12'),
 Row(start=u'2017-01-09 09:00:31', end=u'2017-01-09 09:00:36'),
 Row(start=u'2017-01-09 09:00:39', end=u'2017-01-09 09:00:44'),
 Row(start=u'2017-01-09 09:00:11', end=u'2017-01-09 09:00:16'),
 Row(start=u'2017-01-09 09:00:23', end=u'2017-01-09 09:00:28'),
 Row(start=u'2017-01-09 09:00:15', end=u'2017-01-09 09:00:20')]
Run Code Online (Sandbox Code Playgroud)

所以为什么?

Cam*_*uez 6

它与您的数据何时开始无关。当然,只有在该窗口中有一些数据时才会出现第一个窗口。但是 startTime 与您的数据无关。正如文档所说, startTime 是相对于 1970-01-01 19:00:00 UTC 的偏移量,用于开始窗口间隔。如果你创建一个这样的窗口:
w = F.window("date_field", "7 days", startTime='6 days')

spark 将生成从 1970-01-06 开始的 7 天的窗口:

1970-01-06 19:00:00, 1970-01-13 19:00:00
1970-01-13 19:00:00, 1970-01-20 19:00:00
1970-01-20 19:00 :00, 1970-01-27 19:00:00
...
2017-05-16 19:00:00, 2017-05-23 19:00:00
(如果你继续计算,你会得到这个日期).. .
但是您只会看到与数据框日期相关的窗口。19:00:00 是因为我的时区是 -05,
如果您创建这样的窗口:

w = F.window("date_field", "7 days", startTime='2 days')

spark 将生成从 1970-01-02 开始的 7 天的窗口:

1970-01-02 19:00:00, 1970-01-09 19:00:00
1970-01-09 19:00:00, 1970-01-16 19:00:00
...
2017-05-19 19:00:00, 2017-05-26 19:00:00
(如果你继续计算你会得到这个日期)
......

同样你只会看到与你的数据框的日期相关的窗口。

那么,如何计算数据窗口的开始日期?
您只需要计算自 1970-01-01 以来开始日期的天数,然后将其除以您的窗口长度并取余数。这将是偏移天数的开始时间。


我会用一个例子来解释它:假设你需要你的 Windows 开始于2017-05-21窗口的长度是7天. 我将为示例创建一个虚拟数据框。

row = Row("id", "date_field", "value")
df = sc.parallelize([
row(1, "2017-05-23", 5.0),
row(1, "2017-05-26", 10.0),
row(1, "2017-05-29", 4.0),
row(1, "2017-06-10", 3.0),]).toDF()

start_date = datetime(2017, 5, 21, 19, 0, 0) # 19:00:00 because my 
timezone 
days_since_1970_to_start_date =int(time.mktime(start_date.timetuple())/86400)
offset_days = days_since_1970_to_start_date % 7

w = F.window("date_field", "7 days", startTime='{} days'.format(
                                        offset_days))

df.groupby("id", w).agg(F.sum("value")).orderBy("window.start").show(10, False)
Run Code Online (Sandbox Code Playgroud)

你会得到:

+---+------------------------------------------+----------+
|id |window                                    |sum(value)|
+---+------------------------------------------+----------+
|1  |[2017-05-21 19:00:00, 2017-05-28 19:00:00]|15.0      |
|1  |[2017-05-28 19:00:00, 2017-06-04 19:00:00]|4.0       |
|1  |[2017-06-04 19:00:00, 2017-06-11 19:00:00]|3.0       |
+---+------------------------------------------+----------+
Run Code Online (Sandbox Code Playgroud)


zer*_*323 3

让我们一步一步来。

  • 您的数据开始于2017-01-09 09:00:10

    df.orderBy("dt").show(3, False)
    
    Run Code Online (Sandbox Code Playgroud)
    +---------------------+---+
    |dt                   |val|
    +---------------------+---+
    |2017-01-09 09:00:10.0|1  |
    |2017-01-09 09:00:11.0|1  |
    |2017-01-09 09:00:12.0|1  |
    +---------------------+---+
    
    Run Code Online (Sandbox Code Playgroud)
  • 第一个完整小时是2017-01-09 09:00:00.0

    from pyspark.sql.functions import min as min_, date_format
    (df
       .groupBy()
       .agg(date_format(min_("dt"), "yyyy-MM-dd HH:00:00"))
       .show(1, False))
    
    Run Code Online (Sandbox Code Playgroud)
    +-----------------------------------------+
    |date_format(min(dt), yyyy-MM-dd HH:00:00)|
    +-----------------------------------------+
    |2017-01-09 09:00:00                      |
    +-----------------------------------------+
    
    Run Code Online (Sandbox Code Playgroud)
  • 因此,第一个窗口将从 +(3 秒)开始2017-01-09 09:03:002017-01-09 09:00:00在( ++ )startTime结束。2017-01-09 09:08:002017-01-09 09:00:00startTimewindowDuration

    09:03:00该窗口为空([ , )范围内没有数据09:08:00)。

  • 第一个(和第二个)数据点将落入下一个窗口,即 [ 09:00:07.0, ) ,从+ + 1 *09:00:12.0开始。2017-01-09 09:00:00startTimeslideDuration

    win.orderBy("window.start").show(3, False)
    
    Run Code Online (Sandbox Code Playgroud)
    +---------------------------------------------+---+
    |window                                       |sum|
    +---------------------------------------------+---+
    |[2017-01-09 09:00:07.0,2017-01-09 09:00:12.0]|2  |
    |[2017-01-09 09:00:11.0,2017-01-09 09:00:16.0]|5  |
    |[2017-01-09 09:00:15.0,2017-01-09 09:00:20.0]|5  |
    +---------------------------------------------+---+
    
    Run Code Online (Sandbox Code Playgroud)

    下一个窗口开始2017-01-09 09:00:00+ startTime+ n * slideDurationfor nin1..