Yuc*_*ong 3 apache-spark apache-spark-sql pyspark
我们有一个用户事件的时间序列数据库,如下所示:
timestamp user_id event ticke_type error_type
2019-06-06 14:33:31 user_a choose_ticket ticke_b NULL
2019-06-06 14:34:31 user_b choose_ticket ticke_f NULL
2019-06-06 14:36:31 user_a booing_error NULL error_c
2019-06-06 14:37:31 user_a choose_ticket ticke_h NULL
2019-06-06 14:38:31 user_a booing_error NULL error_d
2019-06-06 14:39:31 user_a booing_error NULL error_e
Run Code Online (Sandbox Code Playgroud)
这是我们需要的一个用例:
为了调查哪种门票类型导致了一些预订错误,我们必须查看仅在较早的活动中可用的门票类型choose_ticket。
在这种情况下,我们要查找的是每个booking_error事件,找到同一用户的上一个choose_ticket事件并将那里的票证类型合并到该booking_error事件。
所以理想情况下,我们想要的输出是:
timestamp user_id event ticke_type error_type
2019-06-06 14:36:31 user_a booing_error ticke_b error_c
2019-06-06 14:38:31 user_a booing_error ticke_h error_d
2019-06-06 14:39:31 user_a booing_error ticke_h error_e
Run Code Online (Sandbox Code Playgroud)
我能找到的最接近的是Spark add new columns to dataframe with value from previous row,这允许我们从前一个事件中获取属性并将其应用于之后的事件。
这几乎有效,除了当有多个事件时(booing_error在本例中),在这种情况下只有第一个事件可以获得所需的属性。例如,我们将从上面的 SO 链接中得到解决方案:
timestamp user_id event ticke_type error_type
2019-06-06 14:36:31 user_a booing_error ticke_b error_c
2019-06-06 14:38:31 user_a booing_error ticke_h error_d
2019-06-06 14:39:31 user_a booing_error NULL error_e
Run Code Online (Sandbox Code Playgroud)
总而言之,对于给定的行,如何找到符合特定条件的前一行并“挑选”其属性?
最好的方法是什么?
org.apache.spark.sql.functions.last就是您正在寻找的。您可以重命名“最接近的”列以替换最后的tick_type。
scala> df.show
+-------------------+-------+-------------+----------+----------+
| timestamp|user_id| event|ticke_type|error_type|
+-------------------+-------+-------------+----------+----------+
|2019-06-06 14:33:31| user_a|choose_ticket| ticke_b| null|
|2019-06-06 14:34:31| user_b|choose_ticket| ticke_f| null|
|2019-06-06 14:36:31| user_a|booking_error| null| error_c|
|2019-06-06 14:37:31| user_a|choose_ticket| ticke_h| null|
|2019-06-06 14:38:31| user_a|booking_error| null| error_d|
|2019-06-06 14:39:31| user_a|booking_error| null| error_e|
+-------------------+-------+-------------+----------+----------+
scala> val overColumns = Window.partitionBy("user_id").orderBy("timestamp")
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@70dc8c9a
scala> df.withColumn("closest",
org.apache.spark.sql.functions.last("ticke_type", true).over(overColumns)).filter($"event" === "booking_error").show
+-------------------+-------+-------------+----------+----------+-------+
| timestamp|user_id| event|ticke_type|error_type|closest|
+-------------------+-------+-------------+----------+----------+-------+
|2019-06-06 14:36:31| user_a|booking_error| null| error_c|ticke_b|
|2019-06-06 14:38:31| user_a|booking_error| null| error_d|ticke_h|
|2019-06-06 14:39:31| user_a|booking_error| null| error_e|ticke_h|
+-------------------+-------+-------------+----------+----------+-------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7613 次 |
| 最近记录: |