小编enm*_*myj的帖子

pyspark 时间序列数据的高性能滚动/窗口聚合

基本问题

我有一个大约有 100 亿行的数据集。我正在寻找最高效的方法来计算四个不同时间窗口(3 天、7 天、14 天、21 天)内的滚动/窗口聚合/指标(总和、平均值、最小值、最大值、stddev)。

Spark/AWS EMR 规格

Spark 版本:2.4.4
ec2 实例类型:r5.24xlarge
核心 ec2 实例数量:10
pyspark 分区数量:600

概述

我读了一堆 SO 帖子,这些帖子要么解决了计算滚动统计的机制,要么解决了如何使窗口函数更快。然而,没有一篇文章以解决我的问题的方式结合这两个概念。我在下面显示了一些选项,它们可以完成我想要的操作,但我需要它们在我的真实数据集上运行得更快,因此我正在寻找更快/更好的建议。

我的数据集的结构如下,但约有 100 亿行:

+--------------------------+----+-----+
|date                      |name|value|
+--------------------------+----+-----+
|2020-12-20 17:45:19.536796|1   |5    |
|2020-12-21 17:45:19.53683 |1   |105  |
|2020-12-22 17:45:19.536846|1   |205  |
|2020-12-23 17:45:19.536861|1   |305  |
|2020-12-24 17:45:19.536875|1   |405  |
|2020-12-25 17:45:19.536891|1   |505  |
|2020-12-26 17:45:19.536906|1   |605  |
|2020-12-20 17:45:19.536796|2   |10   |
|2020-12-21 17:45:19.53683 |2   |110  |
|2020-12-22 17:45:19.536846|2   |210  |
|2020-12-23 17:45:19.536861|2   |310  |
|2020-12-24 17:45:19.536875|2   |410 …
Run Code Online (Sandbox Code Playgroud)

window-functions apache-spark apache-spark-sql rolling-computation pyspark

6
推荐指数
1
解决办法
8717
查看次数

Airflow - 如何在 for 循环的迭代之间设置任务依赖关系?

我正在使用 Airflow 在 for 循环中运行一组任务。循环的目的是遍历数据库表名列表并执行以下操作:

for table_name in list_of_tables:
    if table exists in database (BranchPythonOperator)
        do nothing (DummyOperator)
    else:
        create table (JdbcOperator)
    insert records into table (JdbcOperator, Trigger on One Success)
Run Code Online (Sandbox Code Playgroud)

在 Web UI 上,这看起来像:

for循环中的任务

目前,Airflow 是从上到下然后从左到右执行此图中的任务,例如:tbl_exists_fake_table_one--> tbl_exists_fake_table_two-->tbl_create_fake_table_one等。

但是,insert声明 forfake_table_two取决于fake_table_one正在更新,Airflow 当前未捕获该依赖项。(从技术上讲,这种依赖关系是由 的顺序捕获的list_of_table_names,但我相信在更复杂的情况下这会容易出错)

我希望与fake_table_one运行相关的所有任务,然后是与fake_table_two. 如何在 Airflow 中完成此操作?

完整代码如下:

for tbl_name in list_of_table_names:

    # Check if table exists by querying information tables
    def has_table(tbl_name=tbl_name):
        p …
Run Code Online (Sandbox Code Playgroud)

python etl airflow

3
推荐指数
1
解决办法
5237
查看次数