sta*_*ckq 8 python-3.x apache-spark apache-spark-sql pyspark
我在 a 中有一些每日数据df,可以追溯到 2020 年 1 月 1 日。它看起来与下面类似,但id1每天都有很多s。
| yyyy_mm_dd | id1 | id2 | cost |
|------------|-----|------|-------|
| 2020-01-01 | 23 | 7253 | 5003 |
| 2020-01-01 | 23 | 7743 | 30340 |
| 2020-01-02 | 23 | 7253 | 450 |
| 2020-01-02 | 23 | 7743 | 4500 |
| ... | ... | ... | ... |
| 2021-01-01 | 23 | 7253 | 5675 |
| 2021-01-01 | 23 | 134 | 1030 |
| 2021-01-01 | 23 | 3445 | 564 |
| 2021-01-01 | 23 | 4534 | 345 |
| ... | ... | ... | ... |
Run Code Online (Sandbox Code Playgroud)
我已经对总成本进行了分组和计算,如下所示:
grouped_quarterly = (
df
.withColumn('year_quarter', (F.year(F.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd'))
.groupby('id1', 'year_quarter')
.agg(
F.sum('cost').alias('cost')
)
)
Run Code Online (Sandbox Code Playgroud)
然后我能够成功地进行季度比较,如下所示:
w = Window.partitionBy(F.col('id1'), F.expr('substring(string(year_quarter), -2)')).orderBy('year_quarter')
growth = (
grouped_quarterly
.withColumn('prev_value', F.lag(F.col('cost')).over(w))
.withColumn('diff', F.when(F.isnull(F.col('cost') - F.col('prev_value')), 0).otherwise(F.col('cost') - F.col('prev_value')))
).where(F.col('year_quarter') >= 202101)
Run Code Online (Sandbox Code Playgroud)
我想将其修改为季度至今而不是季度。例如,上面将比较 2020 年 4 月 1 日 - 2020 年 6 月 30 日与 2020 年 4 月 1 日 - 2021 年 4 月 15 日(或 df 中的任何最大日期)。
相反,我更愿意将 2020 年 4 月 1 日 - 2020 年 4 月 15 日与 2021 年 4 月 1 日 - 2021 年 4 月 15 日进行比较。
是否可以确保在 year_quarter 内仅比较相同的期间?
编辑:添加示例输出:
grouped_quarterly.where(F.col('id1') == 222).sort('year_quarter').show(10,False)
| id1 | year_quarter | cost |
|-----|--------------|-------|
| 222 | 202001 | 49428 |
| 222 | 202002 | 43292 |
| 222 | 202003 | 73928 |
| 222 | 202004 | 12028 |
| 222 | 202101 | 19382 |
| 222 | 202102 | 4282 |
growth.where(F.col('id1') == 222).sort('year_quarter').show(10,False)
| id1 | year_quarter | cost | prev_value | diff | growth |
|-----|--------------|-------|------------|--------|--------|
| 222 | 202101 | 52494 | 49428 | 3066 | 6.20 |
| 222 | 202102 | 4282 | 43292 | -39010 | -90.10 |
Run Code Online (Sandbox Code Playgroud)
窗口中的增长计算正在正确完成。但是,由于 202102 正在进行中,因此将它与完整的 202002 进行比较。 202101 的比较非常有效,因为两个 year_quarters 都已完成。
无论如何,对于不完整的季度,是否可以确保窗口函数仅将 year_quarter 内的同期与上一年进行比较?我希望样本数据能让我的问题更清楚一点
这个想法是将任务分为两部分:
首先生成2019Q2、2020Q2和2021Q2的一些额外测试数据:
data = [('2019-04-01', 23, 1), ('2019-04-01', 23, 2), ('2019-04-02', 23, 3), ('2019-04-15', 23, 4),
('2019-04-16', 23, 5), ('2019-04-17', 23, 6), ('2019-05-01', 23, 7), ('2019-06-30', 23, 8),
('2019-07-01', 23, 9), ('2020-01-01',23,5003),('2020-01-01',23,30340), ('2020-01-02',23,450),
('2020-01-02',23,4500), ('2020-04-01', 23, 10), ('2020-04-01', 23, 20), ('2020-04-02', 23, 30),
('2020-04-15', 23, 40), ('2020-04-16', 23, 50), ('2020-04-17', 23, 60), ('2020-05-01', 23, 70),
('2020-06-30', 23, 80), ('2020-07-01', 23, 90), ('2021-01-01',23,5675), ('2021-01-01',23,1030),
('2021-01-01',23,564), ('2021-01-01',23,345), ('2021-04-01', 23, -10), ('2021-04-01', 23, -20),
('2021-04-02', 23, -30), ('2021-04-15', 23, -40)]
Run Code Online (Sandbox Code Playgroud)
计算year_quarter列并缓存结果:
df = spark.createDataFrame(data=data, schema = ["yyyy_mm_dd", "id1", "cost"]) \
.withColumn("yyyy_mm_dd", F.to_date("yyyy_mm_dd", "yyyy-MM-dd")) \
.withColumn('year_quarter', (F.year(F.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd')))) \
.cache()
Run Code Online (Sandbox Code Playgroud)
获取最大日期及其对应的季度:
max_row = df.selectExpr("max(yyyy_mm_dd)", "max_by(year_quarter, yyyy_mm_dd)").head()
cur_date, cur_quarter = max_row[0], max_row[1]
Run Code Online (Sandbox Code Playgroud)
并非严格需要设置cur_date为数据的最大日期。相反cur_date和cur_quarter可以手动设置。
对于除当前季度之外的所有季度,都应用问题中给出的逻辑:
w = Window.partitionBy(F.col('id1'), F.expr('substring(string(year_quarter), -2)')).orderBy('year_quarter')
df_full_quarters = df.filter(f"year_quarter <> {cur_quarter}") \
.groupby('id1', 'year_quarter') \
.agg(F.sum('cost').alias('cost')) \
.withColumn('prev_value', F.lag(F.col('cost')).over(w))
Run Code Online (Sandbox Code Playgroud)
对于当前季度,过滤掉上一年中应忽略的所有日期:
df_cur_quarter = df.filter(f"year_quarter = {cur_quarter} or (year_quarter = {cur_quarter - 100} and add_months(yyyy_mm_dd, 12) <= '{cur_date}')") \
.groupby('id1', 'year_quarter') \
.agg(F.sum('cost').alias('cost')) \
.withColumn('prev_value', F.lag(F.col('cost')).over(w)) \
.filter(f"year_quarter = {cur_quarter}")
Run Code Online (Sandbox Code Playgroud)
最后合并两部分并计算diff列:
growth = df_full_quarters.union(df_cur_quarter) \
.withColumn('diff', F.when(F.isnull(F.col('cost') - F.col('prev_value')), 0).otherwise(F.col('cost') - F.col('prev_value'))) \
.orderBy("id1", "year_quarter")
Run Code Online (Sandbox Code Playgroud)
结果将是:
+---+------------+-----+----------+------+
|id1|year_quarter| cost|prev_value| diff|
+---+------------+-----+----------+------+
| 23| 201902| 36| null| 0|
| 23| 201903| 9| null| 0|
| 23| 202001|40293| null| 0|
| 23| 202002| 360| 36| 324|
| 23| 202003| 90| 9| 81|
| 23| 202101| 7614| 40293|-32679|
| 23| 202102| -100| 100| -200|
+---+------------+-----+----------+------+
Run Code Online (Sandbox Code Playgroud)
在此示例中,为了将 2021Q2 与上一年进行比较,2020Q2 的总和指定为 100,但整个 2020Q2 的实际值为 360。
| 归档时间: |
|
| 查看次数: |
184 次 |
| 最近记录: |