季度迄今增长

sta*_*ckq 8 python-3.x apache-spark apache-spark-sql pyspark

我在 a 中有一些每日数据df,可以追溯到 2020 年 1 月 1 日。它看起来与下面类似,但id1每天都有很多s。

| yyyy_mm_dd | id1 | id2  | cost  |
|------------|-----|------|-------|
| 2020-01-01 | 23  | 7253 | 5003  |
| 2020-01-01 | 23  | 7743 | 30340 |
| 2020-01-02 | 23  | 7253 | 450   |
| 2020-01-02 | 23  | 7743 | 4500  |
| ...        | ... | ...  | ...   |
| 2021-01-01 | 23  | 7253 | 5675  |
| 2021-01-01 | 23  | 134  | 1030  |
| 2021-01-01 | 23  | 3445 | 564   |
| 2021-01-01 | 23  | 4534 | 345   |
| ...        | ... | ...  | ...   |
Run Code Online (Sandbox Code Playgroud)

我已经对总成本进行了分组和计算,如下所示:

grouped_quarterly = (
    df
    .withColumn('year_quarter', (F.year(F.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd'))
    .groupby('id1', 'year_quarter')
    .agg(
        F.sum('cost').alias('cost')
    )
)
Run Code Online (Sandbox Code Playgroud)

然后我能够成功地进行季度比较,如下所示:

w = Window.partitionBy(F.col('id1'), F.expr('substring(string(year_quarter), -2)')).orderBy('year_quarter')
growth = (
    grouped_quarterly
    .withColumn('prev_value', F.lag(F.col('cost')).over(w))
    .withColumn('diff', F.when(F.isnull(F.col('cost') - F.col('prev_value')), 0).otherwise(F.col('cost') - F.col('prev_value')))
).where(F.col('year_quarter') >= 202101)
Run Code Online (Sandbox Code Playgroud)

我想将其修改为季度至今而不是季度。例如,上面将比较 2020 年 4 月 1 日 - 2020 年 6 月 30 日与 2020 年 4 月 1 日 - 2021 年 4 月 15 日(或 df 中的任何最大日期)。

相反,我更愿意将 2020 年 4 月 1 日 - 2020 年 4 月 15 日与 2021 年 4 月 1 日 - 2021 年 4 月 15 日进行比较。

是否可以确保在 year_quarter 内仅比较相同的期间?

编辑:添加示例输出:


grouped_quarterly.where(F.col('id1') == 222).sort('year_quarter').show(10,False)

| id1 | year_quarter | cost  |
|-----|--------------|-------|
| 222 | 202001       | 49428 |
| 222 | 202002       | 43292 |
| 222 | 202003       | 73928 |
| 222 | 202004       | 12028 |
| 222 | 202101       | 19382 |
| 222 | 202102       | 4282  |

growth.where(F.col('id1') == 222).sort('year_quarter').show(10,False)

| id1 | year_quarter | cost  | prev_value | diff   | growth |
|-----|--------------|-------|------------|--------|--------|
| 222 | 202101       | 52494 | 49428      | 3066   | 6.20   |
| 222 | 202102       | 4282  | 43292      | -39010 | -90.10 |
Run Code Online (Sandbox Code Playgroud)

窗口中的增长计算正在正确完成。但是,由于 202102 正在进行中,因此将它与完整的 202002 进行比较。 202101 的比较非常有效,因为两个 year_quarters 都已完成。

无论如何,对于不完整的季度,是否可以确保窗口函数仅将 year_quarter 内的同期与上一年进行比较?我希望样本数据能让我的问题更清楚一点

wer*_*ner 1

这个想法是将任务分为两部分:

  1. 计算整个季度的增长。这个逻辑完全从问题中接管,然后
  2. 计算当前季度的增长。

首先生成2019Q2、2020Q2和2021Q2的一些额外测试数据:

data = [('2019-04-01', 23, 1), ('2019-04-01', 23, 2), ('2019-04-02', 23, 3), ('2019-04-15', 23, 4),
        ('2019-04-16', 23, 5), ('2019-04-17', 23, 6), ('2019-05-01', 23, 7), ('2019-06-30', 23, 8),
        ('2019-07-01', 23, 9), ('2020-01-01',23,5003),('2020-01-01',23,30340), ('2020-01-02',23,450),
        ('2020-01-02',23,4500), ('2020-04-01', 23, 10), ('2020-04-01', 23, 20), ('2020-04-02', 23, 30),
        ('2020-04-15', 23, 40), ('2020-04-16', 23, 50), ('2020-04-17', 23, 60), ('2020-05-01', 23, 70),
        ('2020-06-30', 23, 80), ('2020-07-01', 23, 90), ('2021-01-01',23,5675), ('2021-01-01',23,1030),
        ('2021-01-01',23,564), ('2021-01-01',23,345), ('2021-04-01', 23, -10), ('2021-04-01', 23, -20),
        ('2021-04-02', 23, -30), ('2021-04-15', 23, -40)]
Run Code Online (Sandbox Code Playgroud)

计算year_quarter列并缓存结果:

df = spark.createDataFrame(data=data, schema = ["yyyy_mm_dd", "id1", "cost"]) \
    .withColumn("yyyy_mm_dd", F.to_date("yyyy_mm_dd", "yyyy-MM-dd")) \
    .withColumn('year_quarter', (F.year(F.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd')))) \
    .cache()
Run Code Online (Sandbox Code Playgroud)

获取最大日期及其对应的季度:

max_row = df.selectExpr("max(yyyy_mm_dd)", "max_by(year_quarter, yyyy_mm_dd)").head()
cur_date, cur_quarter = max_row[0], max_row[1]
Run Code Online (Sandbox Code Playgroud)

并非严格需要设置cur_date为数据的最大日期。相反cur_datecur_quarter可以手动设置。

对于除当前季度之外的所有季度,都应用问题中给出的逻辑:

w = Window.partitionBy(F.col('id1'), F.expr('substring(string(year_quarter), -2)')).orderBy('year_quarter')
df_full_quarters = df.filter(f"year_quarter <> {cur_quarter}") \
    .groupby('id1', 'year_quarter') \
    .agg(F.sum('cost').alias('cost')) \
    .withColumn('prev_value', F.lag(F.col('cost')).over(w))
Run Code Online (Sandbox Code Playgroud)

对于当前季度,过滤掉上一年中应忽略的所有日期:

df_cur_quarter = df.filter(f"year_quarter = {cur_quarter} or (year_quarter = {cur_quarter - 100} and add_months(yyyy_mm_dd, 12) <= '{cur_date}')") \
    .groupby('id1', 'year_quarter') \
    .agg(F.sum('cost').alias('cost')) \
    .withColumn('prev_value', F.lag(F.col('cost')).over(w)) \
    .filter(f"year_quarter = {cur_quarter}")
Run Code Online (Sandbox Code Playgroud)

最后合并两部分并计算diff列:

growth = df_full_quarters.union(df_cur_quarter) \
    .withColumn('diff', F.when(F.isnull(F.col('cost') - F.col('prev_value')), 0).otherwise(F.col('cost') - F.col('prev_value'))) \
    .orderBy("id1", "year_quarter")
Run Code Online (Sandbox Code Playgroud)

结果将是:

+---+------------+-----+----------+------+                                      
|id1|year_quarter| cost|prev_value|  diff|
+---+------------+-----+----------+------+
| 23|      201902|   36|      null|     0|
| 23|      201903|    9|      null|     0|
| 23|      202001|40293|      null|     0|
| 23|      202002|  360|        36|   324|
| 23|      202003|   90|         9|    81|
| 23|      202101| 7614|     40293|-32679|
| 23|      202102| -100|       100|  -200|
+---+------------+-----+----------+------+
Run Code Online (Sandbox Code Playgroud)

在此示例中,为了将 2021Q2 与上一年进行比较,2020Q2 的总和指定为 100,但整个 2020Q2 的实际值为 360。