Pyspark:自定义窗口功能

hyp*_*c54 4 window-functions apache-spark apache-spark-sql pyspark

目前我正在试图提取系列连续出现在PySpark数据帧和订单/对他们进行排名,如下图所示(为方便起见,我已经下令初始数据框user_idtimestamp):

df_ini
Run Code Online (Sandbox Code Playgroud)
+-------+--------------------+------------+
|user_id|     timestamp      |  actions   |
+-------+--------------------+------------+
| 217498|           100000001|    'A'     |
| 217498|           100000025|    'A'     |
| 217498|           100000124|    'A'     |
| 217498|           100000152|    'B'     |
| 217498|           100000165|    'C'     |
| 217498|           100000177|    'C'     |
| 217498|           100000182|    'A'     |
| 217498|           100000197|    'B'     |
| 217498|           100000210|    'B'     |
| 854123|           100000005|    'A'     |
| 854123|           100000007|    'A'     |
| etc.
Run Code Online (Sandbox Code Playgroud)

至 :

expected df_transformed
Run Code Online (Sandbox Code Playgroud)
+-------+------------+------------+------------+
|user_id|  actions   | nb_of_occ  |    order   |
+-------+------------+------------+------------+
| 217498|    'A'     |      3     |     1      |
| 217498|    'B'     |      1     |     2      |
| 217498|    'C'     |      2     |     3      |
| 217498|    'A'     |      1     |     4      |
| 217498|    'B'     |      2     |     5      |
| 854123|    'A'     |      2     |     1      |
| etc.
Run Code Online (Sandbox Code Playgroud)

我的猜测是我必须使用一个智能窗口函数,通过user_id和actions对表进行分区,但仅当这些操作是连续的时候!哪个我想不通怎么办......

如果有人在PySpark中遇到这种类型的转换,我会很高兴得到一个提示!

干杯

use*_*411 7

这是一种非常常见的模式,可以通过几个步骤使用窗口函数表示.首先导入所需的功能:

from pyspark.sql.functions import sum as sum_, lag, col, coalesce, lit
from pyspark.sql.window import Window
Run Code Online (Sandbox Code Playgroud)

接下来定义一个窗口:

w = Window.partitionBy("user_id").orderBy("timestamp")
Run Code Online (Sandbox Code Playgroud)

标记每个组的第一行:

is_first = coalesce(
  (lag("actions", 1).over(w) != col("actions")).cast("bigint"),
  lit(1)
)
Run Code Online (Sandbox Code Playgroud)

定义order:

order = sum_("is_first").over(w)
Run Code Online (Sandbox Code Playgroud)

并将所有部分与聚合组合在一起:

(df
    .withColumn("is_first", is_first)
    .withColumn("order", order)
    .groupBy("user_id", "actions", "order")
    .count())
Run Code Online (Sandbox Code Playgroud)

如果您定义df为:

df = sc.parallelize([
    (217498, 100000001, 'A'), (217498, 100000025, 'A'), (217498, 100000124, 'A'),
    (217498, 100000152, 'B'), (217498, 100000165, 'C'), (217498, 100000177, 'C'),
    (217498, 100000182, 'A'), (217498, 100000197, 'B'), (217498, 100000210, 'B'),
    (854123, 100000005, 'A'), (854123, 100000007, 'A')
]).toDF(["user_id", "timestamp", "actions"])
Run Code Online (Sandbox Code Playgroud)

并按结果排序user_id,order你会得到:

+-------+-------+-----+-----+ 
|user_id|actions|order|count|
+-------+-------+-----+-----+
| 217498|      A|    1|    3|
| 217498|      B|    2|    1|
| 217498|      C|    3|    2|
| 217498|      A|    4|    1|
| 217498|      B|    5|    2|
| 854123|      A|    1|    2|
+-------+-------+-----+-----+
Run Code Online (Sandbox Code Playgroud)

  • 为什么你需要一个coalesce()? (2认同)