如何跳过气流中的任务而不跳过其下游任务?

She*_*esh 6 directed-acyclic-graphs airflow airflow-scheduler

让 \xe2\x80\x99s 说这是我的 dag:\nA >> B >> C

\n

如果任务 B 引发异常,我想跳过该任务而不是失败。但是,我不想跳过任务 C。我研究了 AirflowSkipException 和 soft_fail 传感器,但它们也强制跳过下游任务。有人有办法让这个工作吗?

\n

谢谢!

\n

小智 8

目前发布的答案涉及不同的主题或似乎不完全正确。

向任务 C添加触发规则all_failed对于 OP 的示例 DAG 不起作用: A >> B >> C除非任务 A 以failed状态结束,这很可能是不可取的。

事实上,OP 非常接近,因为可以通过组合AirflowSkipExceptionnone_failed触发规则来实现预期行为:

from datetime import datetime

from airflow.exceptions import AirflowSkipException
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator

with DAG(
    dag_id="mydag",
    start_date=datetime(2022, 1, 18),
    schedule_interval="@once"
) as dag:

    def task_b():
        raise AirflowSkipException

    A = DummyOperator(task_id="A")
    B = PythonOperator(task_id="B", python_callable=task_b)
    C = DummyOperator(task_id="C", trigger_rule="none_failed")

    A >> B >> C
Run Code Online (Sandbox Code Playgroud)

其中Airflow执行如下:

在此输入图像描述

这条规则意味着什么?

触发规则

none_failed:所有上游任务均未失败或upstream_failed - 即所有上游任务均已成功或已被跳过

因此基本上我们可以捕获代码中的实际异常并引发提到的 Airflow 异常,该异常“强制”任务状态从failed变为skipped。然而,如果没有trigger_ruleTask-C 的参数,我们最终会将 Task-B 下游标记为skipped


kng*_*yen 0

您可以更改trigger_rule任务声明中的 。

task = BashOperator(
    task_id="task_C",
    bash_command="echo hello world",
    trigger_rule="all_done",
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)