相关疑难解决方法(0)

如何在Airflow中设置DAG之间的依赖关系?

我正在使用Airflow来安排批处理作业.我有一个每晚运行的DAG(A)和每月运行一次的另一个DAG(B).B取决于A已成功完成.但是B需要很长时间才能运行,因此我希望将其保存在单独的DAG中以便更好地进行SLA报告.

如何在同一天成功运行DAG A,使DAG B运行?

python etl airflow

35
推荐指数
2
解决办法
2万
查看次数

气流触发规则"all_done"和"all_success"之间有什么区别?

我正在处理的工作流程中的一个要求是等待某个事件在给定时间内发生,如果没有发生则将任务标记为失败仍然应该执行下游任务.

我想知道"all_done"是否意味着所有依赖任务都已完成,无论它们是否成功.

airflow

12
推荐指数
3
解决办法
1万
查看次数

将顶级DAG连接在一起

我需要有几个相同(只有在不同参数)的顶级DAG小号那也可以用以下限制/假设一起触发:

  • 各个顶级DAG将会拥有,schedule_interval=None因为它们仅需要偶尔的手动触发
  • 但是,一系列DAG需要每天运行
  • 序列中DAG的顺序数量是固定的(在编写代码之前已知),并且很少更改(几个月内一次)
  • 无论DAG是失败还是成功,触发链都不得中断
  • 当前,它们必须串联运行。将来他们可能需要并行触发

因此,我为dags目录中的每个DAG创建了一个文件,现在必须将它们连接起来以便顺序执行。我确定了两种方法可以完成此操作:

  1. SubDagOperator

  2. TriggerDagRunOperator

    • 可在我的演示中使用,并行运行(不按顺序运行),因为它不等待触发的DAG完成才移至下一个
    • ExternalTaskSensor 可能有助于克服上述限制,但会使事情变得很混乱

我的问题是

  • 如何克服的局限性parent_id前缀dag_idSubDagS'
  • 如何迫使TriggerDagRunOperatorS 等待DAG完成
  • 是否有其他替代/更好的方法可以独立的(顶级)DAG连接在一起?
  • 我为每个顶级DAG 创建单独文件(仅在输入方面有所不同的DAG)的方法是否有解决方法?

我正在使用puckel / …

airflow

6
推荐指数
1
解决办法
1073
查看次数

获取所有气流叶节点/任务

我想构建一些我需要捕获所有叶子任务并为它们添加下游依赖项以在我们的数据库中完成作业的东西.有没有一种简单的方法可以在Airflow中找到DAG的所有叶节点?

python airflow apache-airflow

5
推荐指数
1
解决办法
589
查看次数

仅当 AWS athena 表中的新分区/数据可用时,如何使用 python 中的 DAG 触发 Airflow 任务?

我有一个像下面这样的场景:

  1. 仅当源表 (Athena) 中有新数据可用时才触发 aTask 1和。Task 2当一天中有新的数据分区时,应该触发任务 1 和任务 2。
  2. 仅在和Task 3完成时触发Task 1Task 2
  3. 仅触发Task 4完成Task 3

在此输入图像描述

我的代码

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, …
Run Code Online (Sandbox Code Playgroud)

python directed-acyclic-graphs airflow amazon-athena airflow-scheduler

5
推荐指数
1
解决办法
2685
查看次数