气流:如果一项任务失败,则 DAG 标记为“成功”,因为触发规则 ALL_DONE

nor*_*bjd 8 python-2.7 airflow

我有以下 3 个任务的 DAG:

start --> special_task --> end
Run Code Online (Sandbox Code Playgroud)

中间的任务可以成功也可以失败,但end 必须始终执行(想象一下这是一个干净地关闭资源的任务)。为此,我使用了触发规则 ALL_DONE

end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
Run Code Online (Sandbox Code Playgroud)

使用它,end如果special_task失败则正确执行。但是,由于end是最后一个任务并且成功,因此 DAG 始终标记为SUCCESS

如何配置我的 DAG,以便如果其中一项任务失败,则整个 DAG 被标记为FAILED

重现的例子

import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils import trigger_rule

dag = DAG(
    dag_id='my_dag',
    start_date=datetime.datetime.today(),
    schedule_interval=None
)

start = BashOperator(
    task_id='start',
    bash_command='echo start',
    dag=dag
)

special_task = BashOperator(
    task_id='special_task',
    bash_command='exit 1', # force failure
    dag=dag
)

end = BashOperator(
    task_id='end',
    bash_command='echo end',
    dag=dag
)
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE

start.set_downstream(special_task)
special_task.set_downstream(end)
Run Code Online (Sandbox Code Playgroud)

这篇文章似乎是相关的,但答案不适合我的需要,因为end必须执行下游任务(因此必须执行trigger_rule)。

nor*_*bjd 7

正如@JustinasMarozas在评论中解释的那样,解决方案是创建一个虚拟任务,例如:

dummy = DummyOperator(
    task_id='test',
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

并将其下游绑定到special_task

failing_task.set_downstream(dummy)
Run Code Online (Sandbox Code Playgroud)

因此,DAG 被标记为失败,dummy任务被标记为upstream_failed

希望有一个开箱即用的解决方案,但等待,这个解决方案就可以完成工作。


Bas*_*lak 5

我认为这是一个有趣的问题,并花了一些时间弄清楚如何在没有额外虚拟任务的情况下实现它。它变得有点多余,但这是最终结果:

这是完整的 DAG:

import airflow
from airflow import AirflowException
from airflow.models import DAG, TaskInstance, BaseOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule

default_args = {"owner": "airflow", "start_date": airflow.utils.dates.days_ago(3)}

dag = DAG(
    dag_id="finally_task_set_end_state",
    default_args=default_args,
    schedule_interval="0 0 * * *",
    description="Answer for question https://stackoverflow.com/questions/51728441",
)

start = BashOperator(task_id="start", bash_command="echo start", dag=dag)
failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag)


@provide_session
def _finally(task, execution_date, dag, session=None, **_):
    upstream_task_instances = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_id == dag.dag_id,
            TaskInstance.execution_date == execution_date,
            TaskInstance.task_id.in_(task.upstream_task_ids),
        )
        .all()
    )
    upstream_states = [ti.state for ti in upstream_task_instances]
    fail_this_task = State.FAILED in upstream_states

    print("Do logic here...")

    if fail_this_task:
        raise AirflowException("Failing task because one or more upstream tasks failed.")


finally_ = PythonOperator(
    task_id="finally",
    python_callable=_finally,
    trigger_rule=TriggerRule.ALL_DONE,
    provide_context=True,
    dag=dag,
)

succesful_task = DummyOperator(task_id="succesful_task", dag=dag)

start >> [failing_task, succesful_task] >> finally_
Run Code Online (Sandbox Code Playgroud)

查看_finally由 PythonOperator 调用的函数。这里有几个关键点:

  1. 使用 注释@provide_session并添加参数session=None,以便您可以使用 查询 Airflow 数据库session
  2. 查询当前任务的所有上游任务实例:
upstream_task_instances = (
    session.query(TaskInstance)
    .filter(
        TaskInstance.dag_id == dag.dag_id,
        TaskInstance.execution_date == execution_date,
        TaskInstance.task_id.in_(task.upstream_task_ids),
    )
    .all()
)
Run Code Online (Sandbox Code Playgroud)
  1. 从返回的任务实例中,获取状态并检查是否State.FAILED在那里:
upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states
Run Code Online (Sandbox Code Playgroud)
  1. 执行你自己的逻辑:
print("Do logic here...")
Run Code Online (Sandbox Code Playgroud)
  1. 最后,如果出现以下情况,则任务失败fail_this_task=True
if fail_this_task:
    raise AirflowException("Failing task because one or more upstream tasks failed.")
Run Code Online (Sandbox Code Playgroud)

最终结果:

在此处输入图片说明