小编Ber*_*sen的帖子

如何使用 Python 在 Airflow 中的另一个 DAG 成功时触发 DAG?

我有一个 python DAGParent Job和 DAG Child Job。中的任务Child Job应该在成功完成Parent Job每天运行的任务时触发。如何添加外部作业触发器?

我的代码

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')

execute_notebook = PostgresOperator(
  task_id='data_sql',
  postgres_conn_id='REDSHIFT_CONN',
  sql="SELECT * FROM athena_rs.shipments limit 5",
  dag=dag
)
Run Code Online (Sandbox Code Playgroud)

python directed-acyclic-graphs python-3.x airflow airflow-scheduler

14
推荐指数
2
解决办法
1万
查看次数