spark sql:如何为用户项创建 sessionId

Nor*_*mal 2 sql apache-spark apache-spark-sql

假设我有这样的数据集:

|    item    | event       | timestamp |   user    |
|:-----------|------------:|:---------:|:---------:|
| titanic    | view        |     1     |    1      |
| titanic    | add to bag  |     2     |    1      | 
| titanic    | close       |     3     |    1      | 
| avatar     | view        |     6     |    1      |
| avatar     | close       |     10    |    1      |
| titanic    | view        |     20    |    1      |
| titanic    | purchase    |     30    |    1      |
Run Code Online (Sandbox Code Playgroud)

等等。我需要为每个用户计算 sessionId 以获取与特定项目相对应的连续事件。

所以对于那个特定的数据输出应该如下:

|    item    | event       | timestamp |   user    |   sessionId    |
|:-----------|------------:|:---------:|:---------:|:--------------:|
| titanic    | view        |     1     |    1      |   session1     |
| titanic    | add to bag  |     2     |    1      |   session1     |
| titanic    | close       |     3     |    1      |   session1     |
| avatar     | view        |     6     |    1      |   session2     |
| avatar     | close       |     10    |    1      |   session2     |
| titanic    | view        |     20    |    1      |   session3     |
| titanic    | purchase    |     30    |    1      |   session3     |
Run Code Online (Sandbox Code Playgroud)

我试图使用这里描述的类似方法Spark: How to create a sessionId based on userId and timestamp with window:

Window.partitionBy("user", "item").orderBy("timestamp")
Run Code Online (Sandbox Code Playgroud)

但这不起作用,因为相同的用户 - 项目组合可能在不同的会话中。例如看到会话1会议3
有了那个窗口,它们就变成了同一个会话。需要另一种方法的帮助来实现它。

Leo*_*o C 6

这是一种方法,首先使用 conditional 生成一列时间戳值null,使用last(ts, ignoreNulls)withrowsBetween回填最后一个非空时间戳值,最后sessionId使用构造dense_rank

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("titanic", "view", 1, 1),
  ("titanic", "add to bag", 2, 1),
  ("titanic", "close", 3, 1),
  ("avatar", "view", 6, 1),
  ("avatar", "close", 10, 1),
  ("titanic", "view", 20, 1),
  ("titanic", "purchase", 30, 1)
).toDF("item", "event", "timestamp", "user")

val win1 = Window.partitionBy($"user").orderBy($"timestamp")
val win2 = Window.partitionBy($"user").orderBy($"sessTS")

df.
  withColumn( "firstTS",
    when( row_number.over(win1) === 1 || $"item" =!= lag($"item", 1).over(win1),
      $"timestamp" )
  ).
  withColumn( "sessTS",
    last($"firstTS", ignoreNulls = true).
      over(win1.rowsBetween(Window.unboundedPreceding, 0))
  ).
  withColumn("sessionId", concat(lit("session"), dense_rank.over(win2))).
  show

// +-------+----------+---------+----+-------+------+---------+
// |   item|     event|timestamp|user|firstTS|sessTS|sessionId|
// +-------+----------+---------+----+-------+------+---------+
// |titanic|      view|        1|   1|      1|     1| session1|
// |titanic|add to bag|        2|   1|   null|     1| session1|
// |titanic|     close|        3|   1|   null|     1| session1|
// | avatar|      view|        6|   1|      6|     6| session2|
// | avatar|     close|       10|   1|   null|     6| session2|
// |titanic|      view|       20|   1|     20|    20| session3|
// |titanic|  purchase|       30|   1|   null|    20| session3|
// +-------+----------+---------+----+-------+------+---------+
Run Code Online (Sandbox Code Playgroud)