在其他字段上使用窗口功能领带断路器,以获取最新记录

lon*_*tar 5 sql apache-spark apache-spark-sql pyspark pyspark-sql

我有以下数据,其中数据按商店和月份ID进行分区,并按数量排序,以获取商店的主要供应商。

如果两个供应商之间的金额相等,则我需要一个平局决胜者,然后,如果捆绑的供应商中有一个是前几个月销售最多的供应商,则将该供应商设为该月的销售最多的供应商。

如果再次打成平手,则回头会增加。如果再打结,则1个月的延迟将不起作用。最坏的情况是上个月我们还会有更多重复项。

样本数据

val data = Seq((201801,      10941,            115,  80890.44900, 135799.66400),
               (201801,      10941,            3,  80890.44900, 135799.66400) ,
               (201712,      10941,            3, 517440.74500, 975893.79000),
               (201712,      10941,            115, 517440.74500, 975893.79000),
               (201711,      10941,            3 , 371501.92100, 574223.52300),
               (201710,      10941,            115, 552435.57800, 746912.06700),
               (201709,      10941,            115,1523492.60700,1871480.06800),
               (201708,      10941,            115,1027698.93600,1236544.50900),
               (201707,      10941,            33 ,1469219.86900,1622949.53000)
               ).toDF("MTH_ID", "store_id" ,"brand" ,"brndSales","TotalSales")
Run Code Online (Sandbox Code Playgroud)

码:

val window = Window.partitionBy("store_id","MTH_ID").orderBy("brndSales")
val res = data.withColumn("rank",rank over window)
Run Code Online (Sandbox Code Playgroud)

输出:

    +------+--------+-----+-----------+-----------+----+
 |MTH_ID|store_id|brand|  brndSales| TotalSales|rank|
+------+--------+-----+-----------+-----------+----+
|201801|   10941|  115|  80890.449| 135799.664|   1|
|201801|   10941|    3|  80890.449| 135799.664|   1|
|201712|   10941|    3| 517440.745|  975893.79|   1|
|201712|   10941|  115| 517440.745|  975893.79|   1|
|201711|   10941|  115| 371501.921| 574223.523|   1|
|201710|   10941|  115| 552435.578| 746912.067|   1|
|201709|   10941|  115|1523492.607|1871480.068|   1|
|201708|   10941|  115|1027698.936|1236544.509|   1|
|201707|   10941|   33|1469219.869| 1622949.53|   1|
+------+--------+-----+-----------+-----------+----+
Run Code Online (Sandbox Code Playgroud)

我的排名是1和2的记录,但我的排名应该是1的第二个记录,基于上个月的最高金额

我期望以下输出。

    +------+--------+-----+-----------+-----------+----+
    |MTH_ID|store_id|brand|  brndSales| TotalSales|rank|
    +------+--------+-----+-----------+-----------+----+
    |201801|   10941|  115|  80890.449| 135799.664|   2|
    |201801|   10941|    3|  80890.449| 135799.664|   1|
    |201712|   10941|    3| 517440.745|  975893.79|   1|
    |201712|   10941|  115| 517440.745|  975893.79|   1|
    |201711|   10941|    3| 371501.921| 574223.523|   1|
    |201710|   10941|  115| 552435.578| 746912.067|   1|
    |201709|   10941|  115|1523492.607|1871480.068|   1|
    |201708|   10941|  115|1027698.936|1236544.509|   1|
    |201707|   10941|   33|1469219.869| 1622949.53|   1|
    +------+--------+-----+-----------+-----------+----+
Run Code Online (Sandbox Code Playgroud)

我应该写UDAF吗?任何建议都会有所帮助。

Dav*_*ler 3

您可以使用 2 个窗口来完成此操作。首先,您需要使用 lag() 函数来继承上个月的销售值,以便您可以在排名窗口中使用它。这是 pyspark 中的部分:

lag_window = Window.partitionBy("store_id", "brand").orderBy("MTH_ID")
lag_df = data.withColumn("last_month_sales", lag("brndSales").over(lag_window))
Run Code Online (Sandbox Code Playgroud)

然后编辑窗口以包含该新列:

window = Window.partitionBy("store_id","MTH_ID").orderBy("brndSales", "last_month_sales")
lag_df.withColumn("rank",rank().over(window)).show()
+------+--------+-----+-----------+-----------+----------------+----+
|MTH_ID|store_id|brand|  brndSales| TotalSales|last_month_sales|rank|
+------+--------+-----+-----------+-----------+----------------+----+
|201711|   10941|   99| 371501.921| 574223.523|            null|   1|
|201709|   10941|  115|1523492.607|1871480.068|     1027698.936|   1|
|201707|   10941|   33|1469219.869| 1622949.53|            null|   1|
|201708|   10941|  115|1027698.936|1236544.509|            null|   1|
|201710|   10941|  115| 552435.578| 746912.067|     1523492.607|   1|
|201712|   10941|    3| 517440.745|  975893.79|            null|   1|
|201801|   10941|    3|  80890.449| 135799.664|      517440.745|   1|
|201801|   10941|  115|  80890.449| 135799.664|      552435.578|   2|
+------+--------+-----+-----------+-----------+----------------+----+
Run Code Online (Sandbox Code Playgroud)