Airflow ExternalTaskSensor don't fail when External Task fails

pab*_*sjv 3 python airflow airflow-operator

I was trying to use the ExternalTaskSensor in Airflow 1.10.11 to manage the coordinate some dags. I have develop this code to test the functionality:

import time
from datetime import datetime, timedelta
from pprint import pprint

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State

sensors_dag = DAG(
    "test_launch_sensors",
    schedule_interval=None,
    start_date=datetime(2020, 2, 14, 0, 0, 0),
    dagrun_timeout=timedelta(minutes=150),
    tags=["DEMO"],
)

dummy_dag = DAG(
    "test_dummy_dag",
    schedule_interval=None,
    start_date=datetime(2020, 2, 14, 0, 0, 0),
    dagrun_timeout=timedelta(minutes=150),
    tags=["DEMO"],
)


def print_context(ds, **context):
    pprint(context['conf'])


with dummy_dag:
    starts = DummyOperator(task_id="starts", dag=dummy_dag)
    empty = PythonOperator(
        task_id="empty",
        provide_context=True,
        python_callable=print_context,
        dag=dummy_dag,
    )
    ends = DummyOperator(task_id="ends", dag=dummy_dag)

    starts >> empty >> ends

with sensors_dag:
    trigger = TriggerDagRunOperator(
        task_id=f"trigger_{dummy_dag.dag_id}",
        trigger_dag_id=dummy_dag.dag_id,
        conf={"key": "value"},
        execution_date="{{ execution_date }}",
    )
    sensor = ExternalTaskSensor(
        task_id="wait_for_dag",
        external_dag_id=dummy_dag.dag_id,
        external_task_id="ends",
        failed_states=["failed", "upstream_failed"],
        poke_interval=5,
        timeout=120,
    )
    trigger >> sensor
Run Code Online (Sandbox Code Playgroud)

The idea is that one dag triggers another one with a TriggerDagRunOperator. This sets the execution_date to the same value in both dags. This works perfectly when the state of the dummy_dag last task, ends, is success.

However, if I force the intermediate task to fail like so:

def print_context(ds, **context):
    pprint(context['conf'])
    raise Exception('ouch')
Run Code Online (Sandbox Code Playgroud)

The Sensor doesn't detect the failed or the upstream_failed states, and it keeps running until it times out. I was using the failed_states parameter to indicate which states need to be consider as failure, but it seems that is not working.

Am I doing something wrong?

Mar*_*ers 5

failed_states在 Airflow 2.0 中添加;["failed"]如果受监控的 DAG 运行失败,您可以将其设置为将传感器配置为使当前的 DAG 运行失败。如果给定任务 ID,它将监视任务状态,否则监视 DAG 运行状态。

不幸的是,在 Airflow 1.x 中,该ExternalTaskSensor操作仅将 DAG 运行或任务状态与 进行比较allowed_states;一旦受监控的 DAG 运行或任务达到允许的状态之一,传感器就会停止,然后始终标记为成功。默认情况下,传感器只查找SUCCESS状态,因此如果受监控的 DAG 运行失败,它会在没有超时的情况下永远继续探查。如果您failed加入allowed_states列表,它仍然只会将自己标记为成功。

虽然你可以使用超时,像你我所需要的传感器失效它自己的运行DAG如果外部DAG运行失败,仿佛下一个任务的依赖性还没有得到满足。不幸的是,这需要您编写自己的传感器。

这是我的实现;它是ExternalTaskSensor()该类的简化版本,适合我更简单的需求(无需检查特定任务 ID 或相同执行日期以外的任何内容):

from airflow.exceptions import AirflowFailException
from airflow.models import DagRun
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State

class ExternalDagrunSensor(BaseSensorOperator):
    """
    Waits for a different DAG to complete; if the dagrun has failed, this
    task fails itself as well.

    :param external_dag_id: The dag_id that contains the task you want to
        wait for
    :type external_dag_id: str
    """

    template_fields = ["external_dag_id"]
    ui_color = "#19647e"

    @apply_defaults
    def __init__(self, external_dag_id, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.external_dag_id = external_dag_id

    @provide_session
    def poke(self, context, session=None):
        dag_id, execution_date = self.external_dag_id, context["execution_date"]
        self.log.info("Poking for %s on %s ... ", dag_id, execution_date)

        state = (
            session.query(DagRun.state)
            .filter(
                DagRun.dag_id == dag_id,
                DagRun.execution_date == execution_date,
                DagRun.state.in_((State.SUCCESS, State.FAILED)),
            )
            .scalar()
        )
        if state == State.FAILED:
            raise AirflowFailException(
                f"The external DAG run {dag_id} {execution_date} has failed"
            )
        return state is not None
Run Code Online (Sandbox Code Playgroud)

基本传感器实现将poke()重复调用该方法,直到它返回True(或达到可选超时),并通过提升AirflowFailException任务状态立即设置为失败,不重试。如果它们将被安排运行,则由下游任务配置决定。