并行运行气流任务/ dags

Mr.*_*ent 8 python parallel-processing python-3.x airflow

我正在使用气流来编排一些python脚本.我有一个"主"dag,从中运行几个子标记.我的主要dag应该按照以下概述运行:

在此输入图像描述

我已经设法通过使用以下行在我的主要dag中获得此结构:

etl_internal_sub_dag1 >> etl_internal_sub_dag2 >> etl_internal_sub_dag3
etl_internal_sub_dag3 >> etl_adzuna_sub_dag
etl_internal_sub_dag3 >> etl_adwords_sub_dag
etl_internal_sub_dag3 >> etl_facebook_sub_dag
etl_internal_sub_dag3 >> etl_pagespeed_sub_dag

etl_adzuna_sub_dag >> etl_combine_sub_dag
etl_adwords_sub_dag >> etl_combine_sub_dag
etl_facebook_sub_dag >> etl_combine_sub_dag
etl_pagespeed_sub_dag >> etl_combine_sub_dag
Run Code Online (Sandbox Code Playgroud)

我想风做的是先运行etl_internal_sub_dag1然后etl_internal_sub_dag2,然后etl_internal_sub_dag3.当etl_internal_sub_dag3完成我想etl_adzuna_sub_dag,etl_adwords_sub_dag,etl_facebook_sub_dag,和etl_pagespeed_sub_dag并行运行.最后,当最后四个脚本完成后,我想要etl_combine_sub_dag运行.

然而,当我经营的主要DAG, ,etl_adzuna_sub_dag,etl_adwords_sub_dag,etl_facebook_sub_dagetl_pagespeed_sub_dag 正在运行一个接一个,而不是并行.

问:如何确保脚本etl_adzuna_sub_dag,etl_adwords_sub_dag,etl_facebook_sub_dag,和etl_pagespeed_sub_dag并行运行?

编辑:default_argsDAG这个样子:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': start_date,
    'end_date': end_date,
    'email': ['myname@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

DAG_NAME = 'main_dag'

dag = DAG(DAG_NAME, default_args=default_args, catchup = False)
Run Code Online (Sandbox Code Playgroud)

小智 12

并行运行任务的一种简单解决方案是将它们放在 [ ] 括号中。例如 : task_start >> [task_get_users, task_get_posts, task_get_comments, task_get_todos]

欲了解更多信息,您可以阅读这篇 文章《towardsdatascience》


kax*_*xil 6

您将需要使用LocalExecutor

检查您的配置(airflow.cfg),您可能正在使用SequentialExectuor该配置连续执行任务。

Airflow使用后端数据库存储元数据。检查您的airflow.cfg文件并查找executor关键字。默认情况下,Airflow使用SequentialExecutor无论何种顺序执行任务的方法。因此,要允许Airflow在Parallel中运行任务,您将需要在Postges或MySQL中创建一个数据库,并在airflow.cfgsql_alchemy_connparam)中对其进行配置,然后将执行程序更改为LocalExecutorin airflow.cfg然后运行airflow initdb

请注意,在使用时,LocalExecutor您需要使用Postgres或MySQL而不是SQLite作为后端数据库。

更多信息:https : //airflow.incubator.apache.org/howto/initialize-database.html

如果要对Airflow进行真实的测试,则应考虑设置真实的数据库后端并切换到LocalExecutor。由于Airflow是使用强大的SqlAlchemy库构建的以便与其元数据进行交互,因此您应该能够将任何支持的数据库后端用作SqlAlchemy后端。我们建议使用MySQL或Postgres。

  • SQLite 一次仅支持 1 个连接。它不支持超过 1 个连接。因此,您需要使用不同的数据库,如 Postgres 或 MySQL。https://airflow.incubator.apache.org/howto/initialize-database.html - 检查我们推荐使用它的链接。我是 Airflow 项目的提交者之一,根据我的经验,Postgres 将是最佳选择。:) (5认同)
  • 不是子标签。Airflow 使用后端数据库来存储元数据。检查您的“airflow.cfg”文件并查找“executor”关键字。默认情况下,Airflow 使用“SequentialExecutor”,无论如何都会按顺序执行任务。因此,要允许 Airflow 并行运行任务,您需要在 Postges 或 MySQL 中创建一个数据库,并在“airflow.cfg”(“sql_alchemy_conn”参数)中配置它,然后将执行器更改为“LocalExecutor”。 (2认同)

小智 5

尝试:

etl_internal_sub_dag3 >> [etl_adzuna_sub_dag, etl_adwords_sub_dag, etl_facebook_sub_dag, etl_pagespeed_sub_dag]

[etl_adzuna_sub_dag, etl_adwords_sub_dag, etl_facebook_sub_dag, etl_pagespeed_sub_dag] >> etl_combine_sub_dag
Run Code Online (Sandbox Code Playgroud)