整个DAG的气流depends_on_past

cho*_*433 6 airflow apache-airflow airflow-scheduler

有没有一种方法可以使用depends_on_past整个DagRun,而不仅仅是应用于任务?

我有一个每日DAG,而周五DagRun在第四个任务上出错,但周六和周日DagRuns仍按计划进行.使用depends_on_past = True会在相同的第4个任务上暂停DagRun,但前3个任务仍然会运行.

我可以在DagRun DB表中看到有一个state包含failedFriday DagRun 的列.我想要的是一种方法,如果前一个DagRun失败,配置DagRun不启动,在找到之前失败的任务之前不启动和运行.

有谁知道这是否可能?

Wei*_*Lin 8

在第一个任务中,设置depends_on_past=Truewait_for_downstream=True,组合将导致当前dag-run仅在最后一次运行成功时运行.

因为通过在当前dag-run中设置第一个任务将等待先前(depends_on_past)和所有任务(wait_for_downstream)成功

  • 在使用 wait_for_downstream 的地方,depends_on_past 被强制为 True。 (3认同)
  • @Tameem,这是不正确的,depends_on_past 检查前一个实例是否已成功或已被跳过而不仅仅是完成。其实这个答案并不正确。wait_for_downstream 仅检查立即直接下游任务,因此是第一级下游任务。这意味着,如果 DAG 的最后一个任务发生故障,通过将 dependent_on_past 设置为第一个任务,不会阻止您运行下一个 DAGRun (3认同)
  • 这个答案是错误的,并且不执行发问者所要求的。这个问题的答案是正确的:/sf/answers/3007145781/ (2认同)
  • 不同意@PriksoNAI - 答案是正确的。`wait_for_downstream` 是一个增强 `depends_on_past` 的配置。`depends_on_past` 将仅检查上一个任务(因此在给出的示例中,步骤 1-3 仍将在周六运行)。`wait_for_downstream` 更进一步,检查所有*先前运行的*下游任务是否也成功。在给出的示例中,这意味着步骤 1 不会在周六运行,因为步骤 4 在周五失败,该步骤位于周五步骤 1 的下游。 (2认同)

Pon*_*wor 6

这个问题有点老了,但事实证明它是第一个谷歌搜索结果,评分最高的答案显然具有误导性(这让我有点挣扎),所以它肯定需要一个正确的答案。虽然第二个评级的答案应该有效,但有一种简洁的方法可以做到这一点,我个人发现使用 xcom 很丑陋。

Airflow 有一个特殊的操作员类,旨在监视来自其他 dag 运行或其他 dag 的任务状态作为一个整体。所以我们需要做的是在我们的 dag 中的所有任务之前添加一个任务,检查上一次运行是否成功。

from airflow.sensors.external_task_sensor import ExternalTaskSensor


previous_dag_run_sensor = ExternalTaskSensor(
    task_id = 'previous_dag_run_sensor',
    dag = our_dag,
    external_dag_id = our_dag.dag_id,
    execution_delta = our_dag.schedule_interval
)

previous_dag_run_sensor.set_downstream(vertices_of_indegree_zero_from_our_dag)
Run Code Online (Sandbox Code Playgroud)


Joe*_*nek 5

一种可能的解决方案是使用xcom

  1. 将 2 个 PythonOperatorsstart_task和添加end_task到 DAG。
  2. 使所有其他任务依赖 start_task
  3. Makeend_task依赖于所有其他任务 ( set_upstream)。
  4. end_task将始终将变量推last_success = context['execution_date']送到 xcom ( xcom_push)。(provide_context = True在 PythonOperators 中需要)。
  5. 并且start_task将始终检查 xcom( xcom_pull) 以查看是否存在last_success值等于前一个 DagRun 的 execution_date 或 DAG 的 start_date(让进程开始)的变量。

xcom 的使用示例:https :
//github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py