lon*_*tar 5 sql apache-spark apache-spark-sql pyspark pyspark-sql
我有以下数据,其中数据按商店和月份ID进行分区,并按数量排序,以获取商店的主要供应商。
如果两个供应商之间的金额相等,则我需要一个平局决胜者,然后,如果捆绑的供应商中有一个是前几个月销售最多的供应商,则将该供应商设为该月的销售最多的供应商。
如果再次打成平手,则回头会增加。如果再打结,则1个月的延迟将不起作用。最坏的情况是上个月我们还会有更多重复项。
样本数据
val data = Seq((201801, 10941, 115, 80890.44900, 135799.66400),
(201801, 10941, 3, 80890.44900, 135799.66400) ,
(201712, 10941, 3, 517440.74500, 975893.79000),
(201712, 10941, 115, 517440.74500, 975893.79000),
(201711, 10941, 3 , 371501.92100, 574223.52300),
(201710, 10941, 115, 552435.57800, 746912.06700),
(201709, 10941, 115,1523492.60700,1871480.06800),
(201708, 10941, 115,1027698.93600,1236544.50900),
(201707, 10941, 33 ,1469219.86900,1622949.53000)
).toDF("MTH_ID", "store_id" ,"brand" ,"brndSales","TotalSales")
Run Code Online (Sandbox Code Playgroud)
码:
val window = Window.partitionBy("store_id","MTH_ID").orderBy("brndSales")
val res = data.withColumn("rank",rank over window)
Run Code Online (Sandbox Code Playgroud)
输出:
+------+--------+-----+-----------+-----------+----+
|MTH_ID|store_id|brand| brndSales| TotalSales|rank|
+------+--------+-----+-----------+-----------+----+
|201801| 10941| 115| 80890.449| 135799.664| 1|
|201801| 10941| 3| 80890.449| 135799.664| 1|
|201712| 10941| 3| 517440.745| 975893.79| 1|
|201712| 10941| 115| 517440.745| 975893.79| 1|
|201711| 10941| 115| 371501.921| 574223.523| 1|
|201710| 10941| 115| 552435.578| 746912.067| 1|
|201709| 10941| 115|1523492.607|1871480.068| 1|
|201708| 10941| 115|1027698.936|1236544.509| 1|
|201707| 10941| 33|1469219.869| 1622949.53| 1|
+------+--------+-----+-----------+-----------+----+
Run Code Online (Sandbox Code Playgroud)
我的排名是1和2的记录,但我的排名应该是1的第二个记录,基于上个月的最高金额
我期望以下输出。
+------+--------+-----+-----------+-----------+----+
|MTH_ID|store_id|brand| brndSales| TotalSales|rank|
+------+--------+-----+-----------+-----------+----+
|201801| 10941| 115| 80890.449| 135799.664| 2|
|201801| 10941| 3| 80890.449| 135799.664| 1|
|201712| 10941| 3| 517440.745| 975893.79| 1|
|201712| 10941| 115| 517440.745| 975893.79| 1|
|201711| 10941| 3| 371501.921| 574223.523| 1|
|201710| 10941| 115| 552435.578| 746912.067| 1|
|201709| 10941| 115|1523492.607|1871480.068| 1|
|201708| 10941| 115|1027698.936|1236544.509| 1|
|201707| 10941| 33|1469219.869| 1622949.53| 1|
+------+--------+-----+-----------+-----------+----+
Run Code Online (Sandbox Code Playgroud)
我应该写UDAF吗?任何建议都会有所帮助。
您可以使用 2 个窗口来完成此操作。首先,您需要使用 lag() 函数来继承上个月的销售值,以便您可以在排名窗口中使用它。这是 pyspark 中的部分:
lag_window = Window.partitionBy("store_id", "brand").orderBy("MTH_ID")
lag_df = data.withColumn("last_month_sales", lag("brndSales").over(lag_window))
Run Code Online (Sandbox Code Playgroud)
然后编辑窗口以包含该新列:
window = Window.partitionBy("store_id","MTH_ID").orderBy("brndSales", "last_month_sales")
lag_df.withColumn("rank",rank().over(window)).show()
+------+--------+-----+-----------+-----------+----------------+----+
|MTH_ID|store_id|brand| brndSales| TotalSales|last_month_sales|rank|
+------+--------+-----+-----------+-----------+----------------+----+
|201711| 10941| 99| 371501.921| 574223.523| null| 1|
|201709| 10941| 115|1523492.607|1871480.068| 1027698.936| 1|
|201707| 10941| 33|1469219.869| 1622949.53| null| 1|
|201708| 10941| 115|1027698.936|1236544.509| null| 1|
|201710| 10941| 115| 552435.578| 746912.067| 1523492.607| 1|
|201712| 10941| 3| 517440.745| 975893.79| null| 1|
|201801| 10941| 3| 80890.449| 135799.664| 517440.745| 1|
|201801| 10941| 115| 80890.449| 135799.664| 552435.578| 2|
+------+--------+-----+-----------+-----------+----------------+----+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
215 次 |
| 最近记录: |