PySpark 数据框的每日预测

Lui*_*ola 2 sql window-functions apache-spark apache-spark-sql pyspark

我在 PySpark 中有以下数据框:

DT_BORD_REF:该月的日期列
REF_DATE: 过去和未来的当前日期参考
PROD_ID: 产品 ID
COMPANY_CODE: 公司 ID
CUSTOMER_CODE: 客户 ID
MTD_WD: 本月至今的工作日计数(日期 = DT_BORD_REF)
QUANTITY: 已售
QTE_MTD商品数 : 本月至本月的商品数日期

+-------------------+-------------------+-----------------+------------+-------------+-------------+------+--------+-------+
|        DT_BORD_REF|           REF_DATE|          PROD_ID|COMPANY_CODE|CUSTOMER_CODE|COUNTRY_ALPHA|MTD_WD|QUANTITY|QTE_MTD|
+-------------------+-------------------+-----------------+------------+-------------+-------------+------+--------+-------+
|2020-11-02 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     1|     4.0|    4.0|
|2020-11-05 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     3|    null|    4.0|
|2020-11-06 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     4|    null|    4.0|
|2020-11-09 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     5|    null|    4.0|
|2020-11-10 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     6|    null|    4.0|
|2020-11-11 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     7|    null|    4.0|
|2020-11-12 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     8|    null|    4.0|
|2020-11-13 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     9|    null|    4.0|
|2020-11-16 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    10|    null|    4.0|
|2020-11-17 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    11|    null|    4.0|
|2020-11-18 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    12|    null|    4.0|
|2020-11-19 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    13|    null|    4.0|
|2020-11-20 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    14|    null|    4.0|
|2020-11-23 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    15|    null|    4.0|
|2020-11-24 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    16|    null|    4.0|
|2020-11-25 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    17|    null|    4.0|
Run Code Online (Sandbox Code Playgroud)

因为DT_BORD_REF < REF_DATE所有行都是实际销售额,不一定每个工作日都会发生。有时也会发生在非工作日。

因为DT_BORD_REF >= REF_DATE没有销售(这是未来)

其目的是预测使用公式今后所有行的销售:QTE_MTD/MTD_WD计算出的REF_DATE每一个产品,客户和国家。

QTE_MTD 是使用窗口函数从 QUANTITY 列计算的。我需要除以对于MTD_WDREF_DATE这本例为3

我如何与增加一列MTD_WDREF_DATE的产品,客户和国家划分?

换句话说,我需要为每个产品、客户和国家/地区添加一个列,其中第一次出现MTD_WD条件DT_BORD_REF > REF_DATE满足的时间(同样,在本示例中为 3)。

该数据集针对不同的产品、客户和国家/地区有数百万行 工作日由国家/地区提供

希望它很清楚:)

mck*_*mck 6

您可以使用firstwith ignorenulls=True,并when在适当的条件下,获得第一个MTD_WDwhere DT_BORD_REF> REF_DATE

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'val',
    F.first(
        F.when(
            F.col('DT_BORD_REF') > F.col('REF_DATE'),
            F.col('MTD_WD')
        ), 
        ignorenulls=True
    ).over(
        Window.partitionBy('PROD_ID','COMPANY_CODE','CUSTOMER_CODE','COUNTRY_ALPHA')
              .orderBy('DT_BORD_REF')
              .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )
)
Run Code Online (Sandbox Code Playgroud)