Ant*_*ton 7 alert directed-acyclic-graphs airflow google-cloud-composer
目前正在致力于为 Airflow 中长时间运行的任务设置警报。为了取消/使气流 dag 失败,我在 default_args 中放入了“dagrun_timeout”,它执行了我需要的操作,当 dag 运行时间过长(通常被卡住)时,它会失败/出错。唯一的问题是,当超过 dagrun_timeout 时,“on_failure_callback”中的函数不会被调用,因为“on_failure_callback”位于任务级别(我认为),而 dagrun_timeout 位于 dag 级别。
当超过 dagrun_timeout 时,如何执行“on_failure_callback”,或者如何指定 dag 失败时要调用的函数?或者我应该重新考虑我的方法?
on_failure_callback尝试在 DAG 声明期间设置:
with DAG(
dag_id="failure_callback_example",
on_failure_callback=_on_dag_run_fail,
...
) as dag:
...
Run Code Online (Sandbox Code Playgroud)
解释是,on_failure_callback定义的内容default_args只会传递给正在创建的任务,而不传递给 DAG 对象。
这是尝试此行为的示例:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
def _on_dag_run_fail(context):
print("***DAG failed!! do something***")
print(f"The DAG failed because: {context['reason']}")
print(context)
def _alarm(context):
print("** Alarm Alarm!! **")
task_instance: TaskInstance = context.get("task_instance")
print(f"Task Instance: {task_instance} failed!")
default_args = {
"owner": "mi_empresa",
"email_on_failure": False,
"on_failure_callback": _alarm,
}
with DAG(
dag_id="failure_callback_example",
start_date=datetime(2021, 9, 7),
schedule_interval=None,
default_args=default_args,
catchup=False,
on_failure_callback=_on_dag_run_fail,
dagrun_timeout=timedelta(seconds=45),
) as dag:
delayed = BashOperator(
task_id="delayed",
bash_command='echo "waiting..";sleep 60; echo "Done!!"',
)
will_fail = BashOperator(
task_id="will_fail",
bash_command="exit 1",
# on_failure_callback=_alarm,
)
delayed >> will_fail
Run Code Online (Sandbox Code Playgroud)
您可以在调度程序日志中找到回调执行的日志AIRFLOW_HOME/logs/scheduler/date/failure_callback_example:
[2021-09-24 13:12:34,285] {logging_mixin.py:104} INFO - [2021-09-24 13:12:34,285] {dag.py:862} INFO - Executing dag callback function: <function _on_dag_run_fail at 0x7f83102e8670>
[2021-09-24 13:12:34,336] {logging_mixin.py:104} INFO - ***DAG failed!! do something***
[2021-09-24 13:12:34,345] {logging_mixin.py:104} INFO - The DAG failed because: timed_out
Run Code Online (Sandbox Code Playgroud)
在context字典中传递键reason是为了指定 DAG 运行失败的原因。一些值为:'reason': 'timed_out'或'reason': 'task_failure'。这可用于根据reasonDAG 运行失败在回调中执行特定行为。
| 归档时间: |
|
| 查看次数: |
9407 次 |
| 最近记录: |