气流 sla_miss_callback 函数未触发

All*_*Lee 5 airflow airflow-scheduler service-level-agreement

我一直在尝试获得一个 slack 消息回调来触发 SLA 未命中。我注意到:

  1. sla_misses 在 Airflow Web UI 中的 slamiss/list/ 中成功注册

  2. on_failure_callback工作成功

但是,sla_miss_callback函数本身永远不会被触发。

我试过的:

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
    'on_failure_callback': send_task_failed_msg_to_slack,
    'sla': timedelta(minutes=1),
    "retries": 0, 
    "pool": 'canary',
    'priority_weight': 1
}

dag = airflow.DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=send_sla_miss_message_to_slack,
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=5)
)

def sleep():
    """ Sleep for 2 minutes """
    time.sleep(90)
    LOGGER.info("Slept for 2 minutes")

def simple_print(**context):
    """ Prints a message """
    print("Hello World!")


sleep = PythonOperator(
    task_id="sleep",
    python_callable=sleep,
    dag=dag
    )

simple_task = PythonOperator(
    task_id="simple_task",
    python_callable=simple_print,
    provide_context=True,
    dag=dag
    )

sleep >> simple_task

Run Code Online (Sandbox Code Playgroud)

Mak*_*kyi 8

气流 1(未在气流 2 上进行测试!)

SLA missed使用和警报示例Execution Timeout

  • 首先,SLA missed任务运行 2 分钟后你会得到,
  • 然后,4 分钟后任务将失败并发出Execution Timeout警报。
"sla": timedelta(minutes=2),  # Default Task SLA time
"execution_timeout": timedelta(minutes=4),  # Default Task Execution Timeout
Run Code Online (Sandbox Code Playgroud)

此外,您log_url在消息中拥有权限,因此您可以轻松地在 Airflow 中打开任务日志。

Slack 消息示例

import time
from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional, Tuple

from airflow import AirflowException
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.exceptions import AirflowTaskTimeout
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator

SLACK_STATUS_TASK_FAILED = ":red_circle: Task Failed"
SLACK_STATUS_EXECUTION_TIMEOUT = ":alert: Task Failed by Execution Timeout."


def send_slack_alert_sla_miss(
        dag: DAG,
        task_list: str,
        blocking_task_list: str,
        slas: List[Tuple],
        blocking_tis: List[TaskInstance],
) -> None:
    """Send `SLA missed` alert to Slack"""
    task_instance: TaskInstance = blocking_tis[0]
    message = dedent(
        f"""
        :warning: Task SLA missed.
        *DAG*: {dag.dag_id}
        *Task*: {task_instance.task_id}
        *Execution Time*: {task_instance.execution_date.strftime("%Y-%m-%d %H:%M:%S")} UTC
        *SLA Time*: {task_instance.task.sla}
        _* Time by which the job is expected to succeed_
        *Task State*: `{task_instance.state}`
        *Blocking Task List*: {blocking_task_list}
        *Log URL*: {task_instance.log_url}
        """
    )
    send_slack_alert(message=message)


def send_slack_alert_task_failed(context: Dict[str, Any]) -> None:
    """Send `Task Failed` notification to Slack"""
    task_instance: TaskInstance = context.get("task_instance")
    exception: AirflowException = context.get("exception")

    status = SLACK_STATUS_TASK_FAILED
    if isinstance(exception, AirflowTaskTimeout):
        status = SLACK_STATUS_EXECUTION_TIMEOUT

    # Prepare formatted Slack message
    message = dedent(
        f"""
        {status}
        *DAG*: {task_instance.dag_id}
        *Task*: {task_instance.task_id}
        *Execution Time*: {context.get("execution_date").to_datetime_string()} UTC
        *SLA Time*: {task_instance.task.sla}
        _* Time by which the job is expected to succeed_
        *Execution Timeout*: {task_instance.task.execution_timeout}
        _** Max time allowed for the execution of this task instance_
        *Task Duration*: {timedelta(seconds=round(task_instance.duration))}
        *Task State*: `{task_instance.state}`
        *Exception*: {exception}
        *Log URL*: {task_instance.log_url}
        """
    )
    send_slack_alert(
        message=message,
        context=context,
    )


def send_slack_alert(
        message: str,
        context: Optional[Dict[str, Any]] = None,
) -> None:
    """Send prepared message to Slack"""
    slack_webhook_token = BaseHook.get_connection("slack").password
    notification = SlackWebhookOperator(
        task_id="slack_notification",
        http_conn_id="slack",
        webhook_token=slack_webhook_token,
        message=message,
        username="airflow",
    )
    notification.execute(context)


# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    "owner": "airflow",
    "email": ["test@test,com"],
    "email_on_failure": True,
    "depends_on_past": False,
    "retry_delay": timedelta(minutes=5),
    "sla": timedelta(minutes=2),  # Default Task SLA time
    "execution_timeout": timedelta(minutes=4),  # Default Task Execution Timeout
    "on_failure_callback": send_slack_alert_task_failed,
}

with DAG(
        dag_id="test_sla",
        schedule_interval="*/5 * * * *",
        start_date=datetime(2021, 1, 11),
        default_args=default_args,
        sla_miss_callback=send_slack_alert_sla_miss,  # Must be set here, not in default_args!
) as dag:
    delay_python_task = PythonOperator(
        task_id="delay_five_minutes_python_task",
#MIKE MILLIGAN ADDED THIS
        sla=timedelta(minutes=2),
        python_callable=lambda: time.sleep(300),
    )
Run Code Online (Sandbox Code Playgroud)

  • 我正在使用相同的代码,但无法使其工作。“失败”回调有效,但 sla miss 永远不会被触发。我正在 Airflow 2.2.5 上运行 GCP 的 Composer 2.0.17。 (4认同)

小智 5

我曾经遇到过类似的情况。
在调查调度程序日志时,我发现以下错误:

[2020-07-08 09:14:32,781] {scheduler_job.py:534} INFO -  --------------> ABOUT TO CALL SLA MISS CALL BACK  
[2020-07-08 09:14:32,781] {scheduler_job.py:541} ERROR - Could not call sla_miss_callback for DAG 
sla_miss_alert() takes 1 positional arguments but 5 were given
Run Code Online (Sandbox Code Playgroud)

问题是你的sla_miss_callback函数只需要 1 个参数,但实际上应该是这样的:

def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Function that alerts me that dag_id missed sla"""
    # <function code here>
Run Code Online (Sandbox Code Playgroud)

作为参考,请查看Airflow 源代码

注意:不要sla_miss_callback=sla_miss_alert放入default_args. 它应该在 DAG 定义本身中定义。


小智 -2

我自己也遇到过这个问题。on_failure_callback与寻找 python 可调用函数不同,它似乎sla_miss_callback需要完整的函数调用。

一个对我有用的例子:

def sla_miss_alert(dag_id):
    """
    Function that alerts me that dag_id missed sla
    """
    <function code here>

def task_failure_alert(dag_id, context):
    """
    Function that alerts me that a task failed
    """
    <function code here>


dag_id = 'sla_test'
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
    'on_failure_callback': partial(task_failure_alert, dag_id),
    'sla': timedelta(minutes=1),
    "retries": 0, 
    "pool": 'canary',
    'priority_weight': 1
}

dag = airflow.DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=sla_miss_alert(dag_id),
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=5)
)

Run Code Online (Sandbox Code Playgroud)

据我所知, sla_miss_callback 无法访问上下文,这是不幸的。当我停止寻找上下文时,我终于收到了警报。

  • 这是对函数 `sla_miss_alert` 的字面调用,您将永远无法获得与 dag 运行相关的有用信息。此外,`缺少 4 个必需的位置参数:'task_list'、'blocking_task_list'、'slas' 和 'blocking_tis' ` (2认同)