具有复杂条件的Spark SQL窗口函数

use*_*931 22 sql window-functions apache-spark apache-spark-sql pyspark

这可能是最容易通过示例解释的.假设我有一个用户登录网站的DataFrame,例如:

scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

我想在此列添加一个列,指示他们何时成为网站上的活跃用户.但有一点需要注意:有一段时间用户被认为是活动的,在此期间之后,如果他们再次登录,他们的became_active日期会重置.假设这段时间是5天.然后从上表派生的所需表将是这样的:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+
Run Code Online (Sandbox Code Playgroud)

因此,特别是,SirChillingtonIV的became_active日期被重置,因为他们的第二次登录是在活动期过期之后,但是Booooooo99900098的became_active日期没有在他/她登录的第二次重置,因为它落在活动期间.

我最初的想法是使用窗口函数lag,然后使用lagged值填充became_active列; 例如,大致类似于:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
Run Code Online (Sandbox Code Playgroud)

然后,规则填写became_active日期会是这样,如果tmpnull(即,如果它是第一次登录),或者如果login_date - tmp >= 5became_active = login_date; 否则,转到下一个最近的值tmp并应用相同的规则.这表明了一种递归方法,我无法想象实现的方法.

我的问题:这是一种可行的方法,如果是这样的话,我怎么能"回头"看看早期的价值观,tmp直到我找到一个停止的地方?据我所知,我无法迭代Spark SQL的值Column.还有另一种方法来实现这个结果吗?

use*_*411 38

这是诀窍.导入一堆功能:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
Run Code Online (Sandbox Code Playgroud)

定义窗口:

val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")
Run Code Online (Sandbox Code Playgroud)

找到新会话开始的点数:

val newSession =  (coalesce(
  datediff($"login_date", lag($"login_date", 1).over(userWindow)),
  lit(0)
) > 5).cast("bigint")

val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
Run Code Online (Sandbox Code Playgroud)

查找每个会话最早的日期:

val result = sessionized
  .withColumn("became_active", min($"login_date").over(userSessionWindow))
  .drop("session")
Run Code Online (Sandbox Code Playgroud)

数据集定义为:

val df = Seq(
  ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
  ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), 
  ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
  ("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")
Run Code Online (Sandbox Code Playgroud)

结果是:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-04|   2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11|   2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14|   2012-01-11| 
|SirChillingtonIV|2012-08-11|   2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
+----------------+----------+-------------+
Run Code Online (Sandbox Code Playgroud)

  • @SanchitGrover如果`datediff($“ login_date”,lag($“ login_date”,1).over(userWindow))的计算结果为“ null”(框架的第一行),则得到0。 (2认同)

Use*_*345 5

重构其他答案 以配合使用Pyspark

Pyspark下面你可以做。

create data frame

df = sqlContext.createDataFrame(
[
("SirChillingtonIV", "2012-01-04"), 
("Booooooo99900098", "2012-01-04"), 
("Booooooo99900098", "2012-01-06"), 
("OprahWinfreyJr", "2012-01-10"), 
("SirChillingtonIV", "2012-01-11"), 
("SirChillingtonIV", "2012-01-14"), 
("SirChillingtonIV", "2012-08-11")
], 
("user_name", "login_date"))
Run Code Online (Sandbox Code Playgroud)

上面的代码创建了如下数据框

+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
|SirChillingtonIV|2012-01-14|
|SirChillingtonIV|2012-08-11|
+----------------+----------+
Run Code Online (Sandbox Code Playgroud)

现在我们要首先找出两者之间的区别login_date是不止5几天。

为此,请执行以下操作。

必要进口

from pyspark.sql import functions as f
from pyspark.sql import Window


# defining window partitions  
login_window = Window.partitionBy("user_name").orderBy("login_date")
session_window = Window.partitionBy("user_name", "session")

session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))
Run Code Online (Sandbox Code Playgroud)

当我们运行上述代码(如果date_diff是)时,NULLcoalesce函数将替换NULL0

+----------------+----------+-------+
|       user_name|login_date|session|
+----------------+----------+-------+
|  OprahWinfreyJr|2012-01-10|      0|
|SirChillingtonIV|2012-01-04|      0|
|SirChillingtonIV|2012-01-11|      1|
|SirChillingtonIV|2012-01-14|      1|
|SirChillingtonIV|2012-08-11|      2|
|Booooooo99900098|2012-01-04|      0|
|Booooooo99900098|2012-01-06|      0|
+----------------+----------+-------+


# add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-04|   2012-01-04|
|SirChillingtonIV|2012-01-11|   2012-01-11|
|SirChillingtonIV|2012-01-14|   2012-01-11|
|SirChillingtonIV|2012-08-11|   2012-08-11|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
+----------------+----------+-------------+
Run Code Online (Sandbox Code Playgroud)