ebe*_*tbm 5 window-functions apache-spark apache-spark-sql pyspark
我的数据看起来像这样:
userid,eventtime,location_point
4e191908,2017-06-04 03:00:00,18685891
4e191908,2017-06-04 03:04:00,18685891
3136afcb,2017-06-04 03:03:00,18382821
661212dd,2017-06-04 03:06:00,80831484
40e8a7c3,2017-06-04 03:12:00,18825769
Run Code Online (Sandbox Code Playgroud)
我想添加一个新的布尔列,如果userid
在同一个5分钟窗口内有2个或更多,则标记为true location_point
.我有一个想法,使用lag
函数来查找由userid
当前时间戳和接下来的5分钟之间的范围分隔的窗口:
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col
days = lambda i: i * 60*5
windowSpec = W.partitionBy(col("userid")).orderBy(col("eventtime").cast("timestamp").cast("long")).rangeBetween(0, days(5))
lastURN = F.lag(col("location_point"), 1).over(windowSpec)
visitCheck = (last_location_point == output.location_pont)
output.withColumn("visit_check", visitCheck).select("userid","eventtime", "location_pont", "visit_check")
Run Code Online (Sandbox Code Playgroud)
当我使用RangeBetween函数时,此代码给出了一个分析异常:
AnalysisException:u'Window当前行和1500行之间的帧范围必须匹配所需的帧前1和前1之间的行;
你知道解决这个问题的方法吗?
pla*_*nne 10
鉴于您的数据:
让我们添加一个带有时间戳的列,以秒为单位:
df = df.withColumn('timestamp',df_taf.eventtime.astype('Timestamp').cast("long"))
df.show()
+--------+-------------------+--------------+----------+
| userid| eventtime|location_point| timestamp|
+--------+-------------------+--------------+----------+
|4e191908|2017-06-04 03:00:00| 18685891|1496545200|
|4e191908|2017-06-04 03:04:00| 18685891|1496545440|
|3136afcb|2017-06-04 03:03:00| 18382821|1496545380|
|661212dd|2017-06-04 03:06:00| 80831484|1496545560|
|40e8a7c3|2017-06-04 03:12:00| 18825769|1496545920|
|4e191908|2017-06-04 03:11:30| 18685891|1496545890|
+--------+-------------------+--------------+----------+
Run Code Online (Sandbox Code Playgroud)
现在,让我们定义一个窗口函数,其中包含按location_point的分区,按时间戳的顺序以及-300s到当前时间之间的范围.我们可以计算此窗口中的元素数量,并将这些数据放在名为"occurences in_5_min"的列中:
w = Window.partitionBy('location_point').orderBy('timestamp').rangeBetween(-60*5,0)
df = df.withColumn('occurrences_in_5_min',F.count('timestamp').over(w))
df.show()
+--------+-------------------+--------------+----------+--------------------+
| userid| eventtime|location_point| timestamp|occurrences_in_5_min|
+--------+-------------------+--------------+----------+--------------------+
|40e8a7c3|2017-06-04 03:12:00| 18825769|1496545920| 1|
|3136afcb|2017-06-04 03:03:00| 18382821|1496545380| 1|
|661212dd|2017-06-04 03:06:00| 80831484|1496545560| 1|
|4e191908|2017-06-04 03:00:00| 18685891|1496545200| 1|
|4e191908|2017-06-04 03:04:00| 18685891|1496545440| 2|
|4e191908|2017-06-04 03:11:30| 18685891|1496545890| 1|
+--------+-------------------+--------------+----------+--------------------+
Run Code Online (Sandbox Code Playgroud)
现在,如果在特定位置的最近5分钟内出现的次数严格大于1,则可以使用True添加所需的列:
add_bool = udf(lambda col : True if col>1 else False, BooleanType())
df = df.withColumn('already_occured',add_bool('occurrences_in_5_min'))
df.show()
+--------+-------------------+--------------+----------+--------------------+---------------+
| userid| eventtime|location_point| timestamp|occurrences_in_5_min|already_occured|
+--------+-------------------+--------------+----------+--------------------+---------------+
|40e8a7c3|2017-06-04 03:12:00| 18825769|1496545920| 1| false|
|3136afcb|2017-06-04 03:03:00| 18382821|1496545380| 1| false|
|661212dd|2017-06-04 03:06:00| 80831484|1496545560| 1| false|
|4e191908|2017-06-04 03:00:00| 18685891|1496545200| 1| false|
|4e191908|2017-06-04 03:04:00| 18685891|1496545440| 2| true|
|4e191908|2017-06-04 03:11:30| 18685891|1496545890| 1| false|
+--------+-------------------+--------------+----------+--------------------+---------------+
Run Code Online (Sandbox Code Playgroud)
rangeBetween
只是对于像这样的非聚合函数没有意义lag
。lag
始终使用由offset参数表示的特定行,因此指定frame是没有意义的。
要获得时间序列的窗口,可以将window
分组与标准聚合一起使用:
from pyspark.sql.functions import window, countDistinct
(df
.groupBy("location_point", window("eventtime", "5 minutes"))
.agg( countDistinct("userid")))
Run Code Online (Sandbox Code Playgroud)
您可以添加更多参数来修改幻灯片持续时间。
如果通过location
以下方式进行分区,则可以尝试使用窗口功能进行类似操作:
windowSpec = (W.partitionBy(col("location"))
.orderBy(col("eventtime").cast("timestamp").cast("long"))
.rangeBetween(0, days(5)))
df.withColumn("id_count", countDistinct("userid").over(windowSpec))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
5535 次 |
最近记录: |