Mr.*_*ent 8 python parallel-processing python-3.x airflow
我正在使用气流来编排一些python脚本.我有一个"主"dag,从中运行几个子标记.我的主要dag应该按照以下概述运行:
我已经设法通过使用以下行在我的主要dag中获得此结构:
etl_internal_sub_dag1 >> etl_internal_sub_dag2 >> etl_internal_sub_dag3
etl_internal_sub_dag3 >> etl_adzuna_sub_dag
etl_internal_sub_dag3 >> etl_adwords_sub_dag
etl_internal_sub_dag3 >> etl_facebook_sub_dag
etl_internal_sub_dag3 >> etl_pagespeed_sub_dag
etl_adzuna_sub_dag >> etl_combine_sub_dag
etl_adwords_sub_dag >> etl_combine_sub_dag
etl_facebook_sub_dag >> etl_combine_sub_dag
etl_pagespeed_sub_dag >> etl_combine_sub_dag
我想风做的是先运行etl_internal_sub_dag1然后etl_internal_sub_dag2,然后etl_internal_sub_dag3.当etl_internal_sub_dag3完成我想etl_adzuna_sub_dag,etl_adwords_sub_dag,etl_facebook_sub_dag,和etl_pagespeed_sub_dag并行运行.最后,当最后四个脚本完成后,我想要etl_combine_sub_dag运行.
然而,当我经营的主要DAG, ,etl_adzuna_sub_dag,etl_adwords_sub_dag,etl_facebook_sub_dag和etl_pagespeed_sub_dag  正在运行一个接一个,而不是并行.
问:如何确保脚本etl_adzuna_sub_dag,etl_adwords_sub_dag,etl_facebook_sub_dag,和etl_pagespeed_sub_dag并行运行?
编辑:我default_args和DAG这个样子:
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': start_date,
    'end_date': end_date,
    'email': ['myname@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}
DAG_NAME = 'main_dag'
dag = DAG(DAG_NAME, default_args=default_args, catchup = False)
小智 12
并行运行任务的一种简单解决方案是将它们放在 [ ] 括号中。例如 :
task_start >> [task_get_users, task_get_posts, task_get_comments, task_get_todos]
欲了解更多信息,您可以阅读这篇 文章《towardsdatascience》
您将需要使用LocalExecutor。
检查您的配置(airflow.cfg),您可能正在使用SequentialExectuor该配置连续执行任务。
Airflow使用后端数据库存储元数据。检查您的airflow.cfg文件并查找executor关键字。默认情况下,Airflow使用SequentialExecutor无论何种顺序执行任务的方法。因此,要允许Airflow在Parallel中运行任务,您将需要在Postges或MySQL中创建一个数据库,并在airflow.cfg(sql_alchemy_connparam)中对其进行配置,然后将执行程序更改为LocalExecutorin airflow.cfg然后运行airflow initdb。
请注意,在使用时,LocalExecutor您需要使用Postgres或MySQL而不是SQLite作为后端数据库。
更多信息:https : //airflow.incubator.apache.org/howto/initialize-database.html
如果要对Airflow进行真实的测试,则应考虑设置真实的数据库后端并切换到LocalExecutor。由于Airflow是使用强大的SqlAlchemy库构建的以便与其元数据进行交互,因此您应该能够将任何支持的数据库后端用作SqlAlchemy后端。我们建议使用MySQL或Postgres。
小智 5
尝试:
etl_internal_sub_dag3 >> [etl_adzuna_sub_dag, etl_adwords_sub_dag, etl_facebook_sub_dag, etl_pagespeed_sub_dag]
[etl_adzuna_sub_dag, etl_adwords_sub_dag, etl_facebook_sub_dag, etl_pagespeed_sub_dag] >> etl_combine_sub_dag
| 归档时间: | 
 | 
| 查看次数: | 4226 次 | 
| 最近记录: |