将顶级DAG连接在一起

y2k*_*ham 6 airflow

我需要有几个相同(只有在不同参数)的顶级DAG小号那也可以用以下限制/假设一起触发:

  • 各个顶级DAG将会拥有,schedule_interval=None因为它们仅需要偶尔的手动触发
  • 但是,一系列DAG需要每天运行
  • 序列中DAG的顺序数量是固定的(在编写代码之前已知),并且很少更改(几个月内一次)
  • 无论DAG是失败还是成功,触发链都不得中断
  • 当前,它们必须串联运行。将来他们可能需要并行触发

因此,我为dags目录中的每个DAG创建了一个文件,现在必须将它们连接起来以便顺序执行。我确定了两种方法可以完成此操作:

  1. SubDagOperator

  2. TriggerDagRunOperator

    • 可在我的演示中使用,并行运行(不按顺序运行),因为它不等待触发的DAG完成才移至下一个
    • ExternalTaskSensor 可能有助于克服上述限制,但会使事情变得很混乱

我的问题是

  • 如何克服的局限性parent_id前缀dag_idSubDagS'
  • 如何迫使TriggerDagRunOperatorS 等待DAG完成
  • 是否有其他替代/更好的方法可以独立的(顶级)DAG连接在一起?
  • 我为每个顶级DAG 创建单独文件(仅在输入方面有所不同的DAG)的方法是否有解决方法?

我正在使用puckel / docker-airflow

  • Airflow 1.9.0-4
  • Python 3.6-slim
  • CeleryExecutorredis:3.2.7

编辑1

澄清@Viraj Parekh查询

您能否通过触发DAG等待完成DAG来提供更详细的含义?

当我触发import_parent_v1DAG时,TriggerDagRunOperator即使我依次将它们链接起来,它应该使用的所有3个外部DAG 也会开始并行运行。实际上,这些日志表明,虽然它们被一个接一个地触发,但执行在下一个DAG(TriggerDagRunOperator)之前完成。 在此处输入图片说明 在此处输入图片说明 注意:在此示例中,顶级DAG被命名为,importer_child_v1_db_X而其对应的task_ids(用于TriggerDagRunOperator)被命名为importer_v1_db_X

是否可以仅将TriggerDagRunOperator成为DAG中的最后一项任务?

我必须将几个相似的(仅在参数上有所不同)DAG链接在一起,以一个一个地触发它们。因此,我不仅可以放一个 TriggerDagRunOperator,而且可以放很多(这里是3个,但最多可以生产15个)

y2k*_*ham 5

@Viraj Parekh回答中得到提示,我得以TriggerDagRunOperator按预期的方式进行工作。我特此发布我的(部分)答案;会在情况变得清晰时更新。


如何克服的局限性parent_id前缀dag_idSubDagS'

正如@Viraj所说,没有直接的方法可以实现这一目标。扩展SubDagOperator以删除此支票可能有效,但我决定避开它


如何迫使TriggerDagRunOperatorS 等待DAG完成

  • 看一下实现,很明显,的工作TriggerDagRunOperator只是触发外部DAG;就是这样。默认情况下,它不应该等待DAG完成。因此,我正在观察的行为是可以理解的。

  • ExternalTaskSensor是显而易见的出路。但是,在学习基础知识时,Airflow我依赖于DAG的手动触发schedule_interval=None)。在这种情况下,ExternalTaskSensor很难准确地指定execution_date外部任务(正在等待其完成),否则将导致传感器卡住

  • 因此,从实施中获得提示,我通过等待所有相关任务的完成对行为进行了细微调整ExternalTaskSensortask_instance

    execution_date[external_task] >= execution_date[TriggerDagRunOperator] + execution_delta

    这样可以达到预期的结果:外部DAG依次运行。


我 为每个顶级DAG 创建单独文件(仅在输入方面有所不同的DAG)的方法是否有解决方法?

再次通过@Viraj可以通过使用以下方式将DAG分配给全局范围来完成globals()[dag_id] = DAG(..)


编辑1

也许我指的是不正确的资源(上面的链接已经失效),但是ExternalTaskSensor已经包含了参数execution_deltaexecution_date_fn来轻松限制execution_date检测的任务。

  • 晚了2年。对于“如何强制 `TriggerDagRunOperator` 等待 DAG 完成?”的问题,有一个名为 `wait_for_completion` 的参数,它只会在触发的 DAG 完成时将 Operator 标记为“成功”。文档[此处](https://airflow.apache.org/docs/apache-airflow/2.4.2/_api/airflow/operators/trigger_dagrun/index.html#airflow.operators.trigger_dagrun.TriggerDagRunOperator)。 (2认同)

归档时间:

查看次数:

1073 次

最近记录:

6 年,11 月 前