如何从 pyspark 列中获取第二高值?

Jit*_*ddi 5 python group-by dataframe apache-spark pyspark

我有一个 PySpark DataFrame,我想在将groupBy 应用于 2 列(即和)后获得ORDERED_TIME(日期时间字段格式)的第二高值。yyyy-mm-ddCUSTOMER_IDADDRESS_ID

客户可以有多个与某个地址关联的订单,我想获取一(customer,address)对的第二个最新订单

我的方法是根据CUSTOMER_ID和制作一个窗口和分区ADDRESS_ID,排序依据ORDERED_TIME

sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(col('ORDERED_TIME').desc())

df2 = df2.withColumn("second_recent_order", (df2.select("ORDERED_TIME").collect()[1]).over(sorted_order_times))
Run Code Online (Sandbox Code Playgroud)

但是,我收到一条错误消息ValueError: 'over' is not in list

谁能建议解决这个问题的正确方法?

如果需要任何其他信息,请告诉我

样本数据

+-----------+----------+-------------------+
|USER_ID    |ADDRESS_ID|       ORDER DATE  | 
+-----------+----------+-------------------+
|        100| 1000     |2021-01-02         |
|        100| 1000     |2021-01-14         |
|        100| 1000     |2021-01-03         |
|        100| 1000     |2021-01-04         |
|        101| 2000     |2020-05-07         |
|        101| 2000     |2021-04-14         |
+-----------+----------+-------------------+
Run Code Online (Sandbox Code Playgroud)

预期输出

+-----------+----------+-------------------+-------------------+
|USER_ID    |ADDRESS_ID|       ORDER DATE  |second_recent_order
+-----------+----------+-------------------+-------------------+
|        100| 1000     |2021-01-02          |2021-01-04 
|        100| 1000     |2021-01-14          |2021-01-04 
|        100| 1000     |2021-01-03          |2021-01-04 
|        100| 1000     |2021-01-04          |2021-01-04 
|        101| 2000     |2020-05-07          |2020-05-07 
|        101| 2000     |2021-04-14          |2020-05-07 
+-----------+----------+-------------------+-------------------
Run Code Online (Sandbox Code Playgroud)

小智 5

这是另一种方法。使用collect_list

import pyspark.sql.functions as F
from pyspark.sql import Window


sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(F.col('ORDERED_TIME').desc()).rangeBetween(Window.unboundedPreceding,  Window.unboundedFollowing)
df2 = (
  df
  .withColumn("second_recent_order", (F.collect_list(F.col("ORDERED_TIME")).over(sorted_order_times))[1])
)
df2.show()
Run Code Online (Sandbox Code Playgroud)

最终输出