All*_*Lee 5 airflow airflow-scheduler service-level-agreement
我一直在尝试获得一个 slack 消息回调来触发 SLA 未命中。我注意到:
sla_misses 在 Airflow Web UI 中的 slamiss/list/ 中成功注册
on_failure_callback工作成功
但是,sla_miss_callback函数本身永远不会被触发。
我试过的:
在 default_args 级别、dag 级别和任务级别添加 'sla' 和 'sla_miss_callback' 的不同组合
检查我们的调度程序和工作人员的日志以获取 SLA 相关消息
https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L416,但我们什么也没看到
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': send_task_failed_msg_to_slack,
'sla': timedelta(minutes=1),
"retries": 0,
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=send_sla_miss_message_to_slack,
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)
def sleep():
""" Sleep for 2 minutes """
time.sleep(90)
LOGGER.info("Slept for 2 minutes")
def simple_print(**context):
""" Prints a message """
print("Hello World!")
sleep = PythonOperator(
task_id="sleep",
python_callable=sleep,
dag=dag
)
simple_task = PythonOperator(
task_id="simple_task",
python_callable=simple_print,
provide_context=True,
dag=dag
)
sleep >> simple_task
Run Code Online (Sandbox Code Playgroud)
气流 1(未在气流 2 上进行测试!)
SLA missed使用和警报示例Execution Timeout:
SLA missed任务运行 2 分钟后你会得到,Execution Timeout警报。"sla": timedelta(minutes=2), # Default Task SLA time
"execution_timeout": timedelta(minutes=4), # Default Task Execution Timeout
Run Code Online (Sandbox Code Playgroud)
此外,您log_url在消息中拥有权限,因此您可以轻松地在 Airflow 中打开任务日志。
import time
from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional, Tuple
from airflow import AirflowException
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.exceptions import AirflowTaskTimeout
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator
SLACK_STATUS_TASK_FAILED = ":red_circle: Task Failed"
SLACK_STATUS_EXECUTION_TIMEOUT = ":alert: Task Failed by Execution Timeout."
def send_slack_alert_sla_miss(
dag: DAG,
task_list: str,
blocking_task_list: str,
slas: List[Tuple],
blocking_tis: List[TaskInstance],
) -> None:
"""Send `SLA missed` alert to Slack"""
task_instance: TaskInstance = blocking_tis[0]
message = dedent(
f"""
:warning: Task SLA missed.
*DAG*: {dag.dag_id}
*Task*: {task_instance.task_id}
*Execution Time*: {task_instance.execution_date.strftime("%Y-%m-%d %H:%M:%S")} UTC
*SLA Time*: {task_instance.task.sla}
_* Time by which the job is expected to succeed_
*Task State*: `{task_instance.state}`
*Blocking Task List*: {blocking_task_list}
*Log URL*: {task_instance.log_url}
"""
)
send_slack_alert(message=message)
def send_slack_alert_task_failed(context: Dict[str, Any]) -> None:
"""Send `Task Failed` notification to Slack"""
task_instance: TaskInstance = context.get("task_instance")
exception: AirflowException = context.get("exception")
status = SLACK_STATUS_TASK_FAILED
if isinstance(exception, AirflowTaskTimeout):
status = SLACK_STATUS_EXECUTION_TIMEOUT
# Prepare formatted Slack message
message = dedent(
f"""
{status}
*DAG*: {task_instance.dag_id}
*Task*: {task_instance.task_id}
*Execution Time*: {context.get("execution_date").to_datetime_string()} UTC
*SLA Time*: {task_instance.task.sla}
_* Time by which the job is expected to succeed_
*Execution Timeout*: {task_instance.task.execution_timeout}
_** Max time allowed for the execution of this task instance_
*Task Duration*: {timedelta(seconds=round(task_instance.duration))}
*Task State*: `{task_instance.state}`
*Exception*: {exception}
*Log URL*: {task_instance.log_url}
"""
)
send_slack_alert(
message=message,
context=context,
)
def send_slack_alert(
message: str,
context: Optional[Dict[str, Any]] = None,
) -> None:
"""Send prepared message to Slack"""
slack_webhook_token = BaseHook.get_connection("slack").password
notification = SlackWebhookOperator(
task_id="slack_notification",
http_conn_id="slack",
webhook_token=slack_webhook_token,
message=message,
username="airflow",
)
notification.execute(context)
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "airflow",
"email": ["test@test,com"],
"email_on_failure": True,
"depends_on_past": False,
"retry_delay": timedelta(minutes=5),
"sla": timedelta(minutes=2), # Default Task SLA time
"execution_timeout": timedelta(minutes=4), # Default Task Execution Timeout
"on_failure_callback": send_slack_alert_task_failed,
}
with DAG(
dag_id="test_sla",
schedule_interval="*/5 * * * *",
start_date=datetime(2021, 1, 11),
default_args=default_args,
sla_miss_callback=send_slack_alert_sla_miss, # Must be set here, not in default_args!
) as dag:
delay_python_task = PythonOperator(
task_id="delay_five_minutes_python_task",
#MIKE MILLIGAN ADDED THIS
sla=timedelta(minutes=2),
python_callable=lambda: time.sleep(300),
)
Run Code Online (Sandbox Code Playgroud)
小智 5
我曾经遇到过类似的情况。
在调查调度程序日志时,我发现以下错误:
[2020-07-08 09:14:32,781] {scheduler_job.py:534} INFO - --------------> ABOUT TO CALL SLA MISS CALL BACK
[2020-07-08 09:14:32,781] {scheduler_job.py:541} ERROR - Could not call sla_miss_callback for DAG
sla_miss_alert() takes 1 positional arguments but 5 were given
Run Code Online (Sandbox Code Playgroud)
问题是你的sla_miss_callback函数只需要 1 个参数,但实际上应该是这样的:
def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Function that alerts me that dag_id missed sla"""
# <function code here>
Run Code Online (Sandbox Code Playgroud)
作为参考,请查看Airflow 源代码。
注意:不要sla_miss_callback=sla_miss_alert放入default_args. 它应该在 DAG 定义本身中定义。
小智 -2
我自己也遇到过这个问题。on_failure_callback与寻找 python 可调用函数不同,它似乎sla_miss_callback需要完整的函数调用。
一个对我有用的例子:
def sla_miss_alert(dag_id):
"""
Function that alerts me that dag_id missed sla
"""
<function code here>
def task_failure_alert(dag_id, context):
"""
Function that alerts me that a task failed
"""
<function code here>
dag_id = 'sla_test'
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': partial(task_failure_alert, dag_id),
'sla': timedelta(minutes=1),
"retries": 0,
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=sla_miss_alert(dag_id),
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)
Run Code Online (Sandbox Code Playgroud)
据我所知, sla_miss_callback 无法访问上下文,这是不幸的。当我停止寻找上下文时,我终于收到了警报。
| 归档时间: |
|
| 查看次数: |
3870 次 |
| 最近记录: |