我对 DAG、Airflow 和 Python 语法有点陌生(我从 Java 学习编码),但我有一个 DAG,其中包含大约 10 个彼此独立的任务,并且我有另一个 DAG,只有在所有 10 个任务都成功时才能运行。因为按照我的方式,如果一项任务失败,DAG 仍然运行其他任务,并且 DAG 被标记为成功。(这就是我想要的)
有没有办法创建一个新任务(任务 11)来遍历其他任务并检查它们的状态?我找不到返回任务状态的函数
我在想这样的事情(假设有一个state()
函数)
array_of_task_ids= [task1, task2, task3,...]
for i in array_of_tasks_ids
if array_of_task_id[i].state() == Failed
Run Code Online (Sandbox Code Playgroud)
#这意味着如果它发现一个任务处于失败状态,它将运行一个新的虚拟任务,表明其中一个任务失败
task_sensor_failed = DummyOperator(
task_id='task_sensor_failed',
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
然后,在另一个 DAG 上,该 DAG 仅应在该任务“task_sensor_failed”未运行时运行,我将放置传感器
external_sensor= ExternalTaskSensor(
task_id='external_sensor',
external_dag_id='10_tasks_dag',
external_task_id='task_sensor_failed',
Run Code Online (Sandbox Code Playgroud)
...)
这不是我将使用的实际代码。我知道这是不对的,我只是想做一些简单的事情,这样你就明白我想要做什么。我不知道,也许这是一种愚蠢的做法,但就像我说的,我对此很陌生,所以我不确定我在做什么。
不管怎样,总体思路是,只有当另一个 DAG 的所有 10 个任务都成功时,我才能运行一个 DAG,有人可以帮助我完成这个任务吗?抱歉发了这么长的帖子,并提前感谢您的帮助!有什么建议么?
一旦你知道了
ExternalTaskSensor
还可以感知整个DAG
(而不是特定task
的DAG
)DAG
的任何一个叶任务失败,则将其标记为失败(换句话说,仅当所有叶任务都成功时,Airflow 才将 DAG 标记为成功)您无需在第一个 DAG 中添加任何虚拟任务即可完成此操作
就是这样
保持第一个 DAG 不变
让你的第二个 DAG 以感知第一个 DAG 的 开始ExternalTaskSensor
(只需指定external_dag_id
而不指定external_task_id
)
作为扩展,如果它适合您的要求,您可以使第一个 DAG反应性地触发第二个 DAG(仅当所有任务都成功时),如下所示
在你的第一个 DAG 中,
TriggerDagRunOperator
放置一个trigger_rule=TriggerRule.ALL_SUCCESS
(默认)和upstream_tasks_list >> trigger_task
trigger_dag_id='my_2nd_dag_id'
保持第二个 DAG 不变
归档时间: |
|
查看次数: |
4214 次 |
最近记录: |