Airflow - 仅当另一个 DAG 上的所有任务都成功时才运行 DAG

Vic*_*acy 2 airflow

我对 DAG、Airflow 和 Python 语法有点陌生(我从 Java 学习编码),但我有一个 DAG,其中包含大约 10 个彼此独立的任务,并且我有另一个 DAG,只有在所有 10 个任务都成功时才能运行。因为按照我的方式,如果一项任务失败,DAG 仍然运行其他任务,并且 DAG 被标记为成功。(这就是我想要的)

有没有办法创建一个新任务(任务 11)来遍历其他任务并检查它们的状态?我找不到返回任务状态的函数

我在想这样的事情(假设有一个state()函数)

array_of_task_ids= [task1, task2, task3,...]
for i in array_of_tasks_ids
if array_of_task_id[i].state() == Failed 
Run Code Online (Sandbox Code Playgroud)

#这意味着如果它发现一个任务处于失败状态,它将运行一个新的虚拟任务,表明其中一个任务失败

 task_sensor_failed = DummyOperator(
 task_id='task_sensor_failed',
 dag=dag,
 )
Run Code Online (Sandbox Code Playgroud)

然后,在另一个 DAG 上,该 DAG 仅应在该任务“task_sensor_failed”未运行时运行,我将放置传感器

external_sensor= ExternalTaskSensor(
task_id='external_sensor',
external_dag_id='10_tasks_dag',
external_task_id='task_sensor_failed',
Run Code Online (Sandbox Code Playgroud)

...)

这不是我将使用的实际代码。我知道这是不对的,我只是想做一些简单的事情,这样你就明白我想要做什么。我不知道,也许这是一种愚蠢的做法,但就像我说的,我对此很陌生,所以我不确定我在做什么。

不管怎样,总体思路是,只有当另一个 DAG 的所有 10 个任务都成功时,我才能运行一个 DAG,有人可以帮助我完成这个任务吗?抱歉发了这么长的帖子,并提前感谢您的帮助!有什么建议么?

y2k*_*ham 7

一旦你知道了

  • ExternalTaskSensor还可以感知整个DAG(而不是特定taskDAG
  • 如果AirflowDAG的任何一个叶任务失败,则将其标记为失败(换句话说,仅当所有叶任务都成功时,Airflow 才将 DAG 标记为成功)

您无需在第一个 DAG 中添加任何虚拟任务即可完成此操作


就是这样

  1. 保持第一个 DAG 不变

  2. 让你的第二个 DAG 以感知第一个 DAG 的 开始ExternalTaskSensor只需指定external_dag_id而不指定external_task_id

  • 如果第一个 DAG 的任何一个任务失败,这将继续将其标记为失败
  • 但如果第一个 DAG 的所有任务都成功(即第一个 DAG 成功),仍然会让第二个 DAG 运行

作为扩展,如果它适合您的要求,您可以使第一个 DAG反应性地触发第二个 DAG(仅当所有任务都成功时),如下所示

  1. 在你的第一个 DAG 中,

  2. 保持第二个 DAG 不变