当超过“dagrun_timeout”时,气流触发“on_failure_callback”

Ant*_*ton 7 alert directed-acyclic-graphs airflow google-cloud-composer

目前正在致力于为 Airflow 中长时间运行的任务设置警报。为了取消/使气流 dag 失败,我在 default_args 中放入了“dagrun_timeout”,它执行了我需要的操作,当 dag 运行时间过长(通常被卡住)时,它会失败/出错。唯一的问题是,当超过 dagrun_timeout 时,“on_failure_callback”中的函数不会被调用,因为“on_failure_callback”位于任务级别(我认为),而 dagrun_timeout 位于 dag 级别。

当超过 dagrun_timeout 时,如何执行“on_failure_callback”,或者如何指定 dag 失败时要调用的函数?或者我应该重新考虑我的方法?

Nic*_*coE 6

on_failure_callback尝试在 DAG 声明期间设置:

with DAG(
    dag_id="failure_callback_example",
    on_failure_callback=_on_dag_run_fail,
    ...
) as dag:
...
Run Code Online (Sandbox Code Playgroud)

解释是,on_failure_callback定义的内容default_args只会传递给正在创建的任务,而不传递给 DAG 对象。

这是尝试此行为的示例:


from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator


def _on_dag_run_fail(context):
    print("***DAG failed!! do something***")
    print(f"The DAG failed because: {context['reason']}")
    print(context)


def _alarm(context):
    print("** Alarm Alarm!! **")
    task_instance: TaskInstance = context.get("task_instance")
    print(f"Task Instance: {task_instance} failed!")


default_args = {
    "owner": "mi_empresa",
    "email_on_failure": False,
    "on_failure_callback": _alarm,
}


with DAG(
    dag_id="failure_callback_example",
    start_date=datetime(2021, 9, 7),
    schedule_interval=None,
    default_args=default_args,
    catchup=False,
    on_failure_callback=_on_dag_run_fail,
    dagrun_timeout=timedelta(seconds=45),
) as dag:

    delayed = BashOperator(
        task_id="delayed",
        bash_command='echo "waiting..";sleep 60; echo "Done!!"',
    )
    will_fail = BashOperator(
        task_id="will_fail",
        bash_command="exit 1",
        # on_failure_callback=_alarm,
    )
delayed >> will_fail

Run Code Online (Sandbox Code Playgroud)

您可以在调度程序日志中找到回调执行的日志AIRFLOW_HOME/logs/scheduler/date/failure_callback_example

[2021-09-24 13:12:34,285] {logging_mixin.py:104} INFO - [2021-09-24 13:12:34,285] {dag.py:862} INFO - Executing dag callback function: <function _on_dag_run_fail at 0x7f83102e8670>
[2021-09-24 13:12:34,336] {logging_mixin.py:104} INFO - ***DAG failed!! do something***
[2021-09-24 13:12:34,345] {logging_mixin.py:104} INFO - The DAG failed because: timed_out

Run Code Online (Sandbox Code Playgroud)

编辑:

context字典中传递键reason是为了指定 DAG 运行失败的原因。一些值为:'reason': 'timed_out''reason': 'task_failure'。这可用于根据reasonDAG 运行失败在回调中执行特定行为。