pyspark.sql.functions.window 函数的“startTime”参数和 window.start 有什么作用?

hy2*_*015 4 sql window dataframe apache-spark pyspark

示例如下:

df=spark.createDataFrame([
    (1,"2017-05-15 23:12:26",2.5),
    (1,"2017-05-09 15:26:58",3.5),
    (1,"2017-05-18 15:26:58",3.6),
    (2,"2017-05-15 15:24:25",4.8),
    (3,"2017-05-25 15:14:12",4.6)],["index","time","val"]).orderBy("index","time")
df.collect()
Run Code Online (Sandbox Code Playgroud)
+-----+-------------------+---+
|index|               time|val|
+-----+-------------------+---+
|    1|2017-05-09 15:26:58|3.5|
|    1|2017-05-15 23:12:26|2.5|
|    1|2017-05-18 15:26:58|3.6|
|    2|2017-05-15 15:24:25|4.8|
|    3|2017-05-25 15:14:12|4.6|
+-----+-------------------+---+
Run Code Online (Sandbox Code Playgroud)

对于函数“pyspark.sql.functions”

window(timeColumn, windowDuration, slideDuration=None, startTime=None)

timeColumn?The time column must be of TimestampType.

windowDuration?  Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.

slideDuration: If the 'slideDuration' is not provided, the windows will be tumbling windows.

startTime? the startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
Run Code Online (Sandbox Code Playgroud)

我想每 5 天计算一次此函数中的参数“val”,并将参数“slideDuration”设置为一个字符串值“5 天”

timeColumn="time",windowDuration="5 day",slideDuration="5 day"
Run Code Online (Sandbox Code Playgroud)

代码如下:

window(timeColumn, windowDuration, slideDuration=None, startTime=None)

timeColumn?The time column must be of TimestampType.

windowDuration?  Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.

slideDuration: If the 'slideDuration' is not provided, the windows will be tumbling windows.

startTime? the startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
Run Code Online (Sandbox Code Playgroud)

当我获得参数“window.start”的值时,时间不是从我在“时间”列中给出的最小时间或我之前设置的时间开始,而是从其他时间开始。

结果如下:

+-----+---------------------+---------------------+-------+
|index|start                |end                  |sum_val|
+-----+---------------------+---------------------+-------+
|1    |2017-05-09 08:00:00.0|2017-05-14 08:00:00.0|3.5    |
|1    |2017-05-14 08:00:00.0|2017-05-19 08:00:00.0|6.1    |
|2    |2017-05-14 08:00:00.0|2017-05-19 08:00:00.0|4.8    |
|3    |2017-05-24 08:00:00.0|2017-05-29 08:00:00.0|4.6    |
+-----+---------------------+---------------------+-------+
Run Code Online (Sandbox Code Playgroud)

当我为参数“startTime”设置一个值为“0秒”时(代码如下):

timeColumn="time",windowDuration="5 day",slideDuration="5 day"
Run Code Online (Sandbox Code Playgroud)
df2=df.groupBy("index",F.window("time",windowDuration="5 day",slideDuration="5 day")).agg(F.sum("val").alias("sum_val"))
Run Code Online (Sandbox Code Playgroud)

结果出来了还是没有从“时间”一栏的最少时间开始

那么如何让这个函数以“时间”列中的最短时间启动,或者我第一次设置的时间,比如“2017-05-09 15:25:30”,非常感谢你让我摆脱这个问题

'startTime'的官方介绍如下

The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. 
For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15...
provide `startTime` as `15 minutes`.
Run Code Online (Sandbox Code Playgroud)

参考如下

1. 'pyspark.sql.functions.window' 函数的 'startTime' 参数有什么作用?

2. https://github.com/apache/spark/pull/12008

3. http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.functions.window

zer*_*323 5

您遇到的问题与此完全无关,startTime并且有两个组成部分:

  • Spark 的时间戳语义,其中时间戳始终根据本地时区进行处理。根据输出中显示的偏移量,我们得出结论,JVM 使用 GMT+8 或等效时区。请考虑以下两种情况:

    >>> from pyspark.sql.functions import window
    >>>
    >>> spark.conf.get("spark.driver.extraJavaOptions")
    '-Duser.timezone=GMT+8'
    >>> spark.conf.get("spark.executor.extraJavaOptions")
    '-Duser.timezone=GMT+8'
    >>> str(spark.sparkContext._jvm.java.util.TimeZone.getDefault())
    'sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]'
    >>>
    >>> df = spark.createDataFrame([(1,"2017-05-15 23:12:26",2.5)], ["index","time","val"])
    >>> (df
    ...     .withColumn("w", window("time" ,windowDuration="5 days" ,slideDuration="5 days"))
    ...     .show(1, False))
    ...     
    +-----+-------------------+---+---------------------------------------------+
    |index|time               |val|w                                            |
    +-----+-------------------+---+---------------------------------------------+
    |1    |2017-05-15 23:12:26|2.5|[2017-05-14 08:00:00.0,2017-05-19 08:00:00.0]|
    +-----+-------------------+---+---------------------------------------------+
    
    Run Code Online (Sandbox Code Playgroud)

    对比

    >>> from pyspark.sql.functions import window
    >>>
    >>> spark.conf.get("spark.driver.extraJavaOptions")
    '-Duser.timezone=UTC'
    >>> spark.conf.get("spark.executor.extraJavaOptions")
    '-Duser.timezone=UTC'
    >>> str(spark.sparkContext._jvm.java.util.TimeZone.getDefault())
    'sun.util.calendar.ZoneInfo[id="UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]'
    >>>
    >>> df = spark.createDataFrame([(1,"2017-05-15 23:12:26",2.5)], ["index","time","val"])
    >>> (df
    ...     .withColumn("w", window("time" ,windowDuration="5 days" ,slideDuration="5 days"))
    ...     .show(1, False))
    ... 
    +-----+-------------------+---+---------------------------------------------+
    |index|time               |val|w                                            |
    +-----+-------------------+---+---------------------------------------------+
    |1    |2017-05-15 23:12:26|2.5|[2017-05-14 00:00:00.0,2017-05-19 00:00:00.0]|
    +-----+-------------------+---+---------------------------------------------+
    
    Run Code Online (Sandbox Code Playgroud)

    如您所见,输出根据本地时区进行调整,而输入字符串被解析为 UTC 时间戳。

  • window语义。如果你看一下执行计划

    >>> df.withColumn("w", window("time",windowDuration="5 days",slideDuration="5 days")).explain(False)
    == Physical Plan ==
    *Project [index#21L, time#22, val#23, window#68 AS w#67]
    +- *Filter (((isnotnull(time#22) && isnotnull(window#68)) && (cast(time#22 as timestamp) >= window#68.start)) && (cast(time#22 as timestamp) < window#68.end))
       +- *Expand [List(named_struct(start, ((((CEIL((cast((precisetimestamp(cast(time#22 as timestamp)) - 0) as double) / 4.32E11)) + 0) - 1) * 432000000000) + 0), end, ((((CEIL((cast((precisetimestamp(cast(time#22 as timestamp)) - 0) as double) / 4.32E11)) + 0) - 1) * 432000000000) + 432000000000)), index#21L, time#22, val#23), List(named_struct(start, ((((CEIL((cast((precisetimestamp(cast(time#22 as timestamp)) - 0) as double) / 4.32E11)) + 1) - 1) * 432000000000) + 0), end, ((((CEIL((cast((precisetimestamp(cast(time#22 as timestamp)) - 0) as double) / 4.32E11)) + 1) - 1) * 432000000000) + 432000000000)), index#21L, time#22, val#23)], [window#68, index#21L, time#22, val#23]
          +- Scan ExistingRDD[index#21L,time#22,val#23]
    
    Run Code Online (Sandbox Code Playgroud)

    并专注于作为单个组件:

    ((((CEIL((cast((precisetimestamp(cast(time#22 as timestamp)) - 0) as double) / 4.32E11)) + 0) - 1) * 432000000000)
    
    Run Code Online (Sandbox Code Playgroud)

    您会看到窗口采用了数值的上限,有效地将时间戳截断为整个间隔。

终于startTime

df.groupBy("index",F.window("time",windowDuration="5 day",slideDuration="5  day",startTime="0 second"))
Run Code Online (Sandbox Code Playgroud)

根本没有效果,因为它的行为类似于默认(无偏移)。如果有什么你可以尝试:

(startTime, ) = (df
    .select(min_(col("time").cast("timestamp")).alias("ts"))
    .select(
       ((col("ts").cast("double") - 
       col("ts").cast("date").cast("timestamp").cast("double")
      ) * 1000).cast("integer"))
     .first())

w = window(
    "time", 
    windowDuration="5 days",
    slideDuration="5 days",
    startTime="{} milliseconds".format(startTime))


df.withColumn("w", w).show(1, False)
Run Code Online (Sandbox Code Playgroud)
>>> from pyspark.sql.functions import window
>>>
>>> spark.conf.get("spark.driver.extraJavaOptions")
'-Duser.timezone=GMT+8'
>>> spark.conf.get("spark.executor.extraJavaOptions")
'-Duser.timezone=GMT+8'
>>> str(spark.sparkContext._jvm.java.util.TimeZone.getDefault())
'sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]'
>>>
>>> df = spark.createDataFrame([(1,"2017-05-15 23:12:26",2.5)], ["index","time","val"])
>>> (df
...     .withColumn("w", window("time" ,windowDuration="5 days" ,slideDuration="5 days"))
...     .show(1, False))
...     
+-----+-------------------+---+---------------------------------------------+
|index|time               |val|w                                            |
+-----+-------------------+---+---------------------------------------------+
|1    |2017-05-15 23:12:26|2.5|[2017-05-14 08:00:00.0,2017-05-19 08:00:00.0]|
+-----+-------------------+---+---------------------------------------------+
Run Code Online (Sandbox Code Playgroud)