Spark Window函数最后一个非空值

Yuc*_*ong 3 apache-spark apache-spark-sql pyspark

我们有一个用户事件的时间序列数据库,如下所示:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:33:31   user_a      choose_ticket    ticke_b        NULL
2019-06-06 14:34:31   user_b      choose_ticket    ticke_f        NULL
2019-06-06 14:36:31   user_a      booing_error     NULL           error_c  
2019-06-06 14:37:31   user_a      choose_ticket    ticke_h        NULL
2019-06-06 14:38:31   user_a      booing_error     NULL           error_d
2019-06-06 14:39:31   user_a      booing_error     NULL           error_e
Run Code Online (Sandbox Code Playgroud)

这是我们需要的一个用例:

为了调查哪种门票类型导致了一些预订错误,我们必须查看仅在较早的活动中可用的门票类型choose_ticket

在这种情况下,我们要查找的是每个booking_error事件,找到同一用户的上一个choose_ticket事件并将那里的票证类型合并到该booking_error事件。

所以理想情况下,我们想要的输出是:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:36:31   user_a      booing_error     ticke_b        error_c  
2019-06-06 14:38:31   user_a      booing_error     ticke_h        error_d
2019-06-06 14:39:31   user_a      booing_error     ticke_h        error_e
Run Code Online (Sandbox Code Playgroud)

我能找到的最接近的是Spark add new columns to dataframe with value from previous row,这允许我们从前一个事件中获取属性并将其应用于之后的事件。

这几乎有效,除了当有多个事件时(booing_error在本例中),在这种情况下只有第一个事件可以获得所需的属性。例如,我们将从上面的 SO 链接中得到解决方案:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:36:31   user_a      booing_error     ticke_b        error_c  
2019-06-06 14:38:31   user_a      booing_error     ticke_h        error_d
2019-06-06 14:39:31   user_a      booing_error     NULL           error_e
Run Code Online (Sandbox Code Playgroud)

总而言之,对于给定的行,如何找到符合特定条件的前一行并“挑选”其属性?

最好的方法是什么?

C.S*_*lly 6

org.apache.spark.sql.functions.last就是您正在寻找的。您可以重命名“最接近的”列以替换最后的tick_type。

scala> df.show
+-------------------+-------+-------------+----------+----------+
|          timestamp|user_id|        event|ticke_type|error_type|
+-------------------+-------+-------------+----------+----------+
|2019-06-06 14:33:31| user_a|choose_ticket|   ticke_b|      null|
|2019-06-06 14:34:31| user_b|choose_ticket|   ticke_f|      null|
|2019-06-06 14:36:31| user_a|booking_error|      null|   error_c|
|2019-06-06 14:37:31| user_a|choose_ticket|   ticke_h|      null|
|2019-06-06 14:38:31| user_a|booking_error|      null|   error_d|
|2019-06-06 14:39:31| user_a|booking_error|      null|   error_e|
+-------------------+-------+-------------+----------+----------+

scala> val overColumns = Window.partitionBy("user_id").orderBy("timestamp")
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@70dc8c9a

scala> df.withColumn("closest", 
  org.apache.spark.sql.functions.last("ticke_type", true).over(overColumns)).filter($"event" === "booking_error").show
+-------------------+-------+-------------+----------+----------+-------+
|          timestamp|user_id|        event|ticke_type|error_type|closest|
+-------------------+-------+-------------+----------+----------+-------+
|2019-06-06 14:36:31| user_a|booking_error|      null|   error_c|ticke_b|
|2019-06-06 14:38:31| user_a|booking_error|      null|   error_d|ticke_h|
|2019-06-06 14:39:31| user_a|booking_error|      null|   error_e|ticke_h|
+-------------------+-------+-------------+----------+----------+-------+
Run Code Online (Sandbox Code Playgroud)