on_failure_callback 多次触发

use*_*992 3 python apache-kafka airflow airflow-2.x

我想发布单个 Kafka 消息,以防气流并行任务失败。我的气流DAG与下面类似。

from datetime import datetime, timedelta
from airflow.models import Variable
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator


def task_failure_callback(context):
    ti = context['task_instance']
    print(f"task {ti.task_id } failed in dag { ti.dag_id }, error: {ti.xcom_pull(key='error')} ")
    #call function to publish kafka message


def task_success_callback(context):
    ti = context['task_instance']
    print(f"Task {ti.task_id } has succeeded in dag { ti.dag_id }.")
    #call function to publish kafka message    

def dag_success_callback(context):
    dag_status = f"DAG has succeeded, run_id: {context['run_id']}"
    print(dag_status)
    Variable.set("TEST_CALLBACK_DAG_STATUS", dag_status)
    #call function to publish kafka message

def dag_failure_callback(context):
    ti = context['task_instance']
    dag_status = f"DAG has failed, run_id: {context['run_id']}, task id: {ti.task_id}"
    print(dag_status)
    Variable.set("TEST_CALLBACK_DAG_STATUS", dag_status)
    #call function to publish kafka message

def user_func1(ti):
    try:
        input_val = int(Variable.get("TEST_CALLBACK_INPUT", 0))
        if input_val % 10 == 0:
            raise ValueError("Invalid Input")
    except Exception as e:
        ti.xcom_push(key="error", value=str(e))
        raise e

def user_func2(ti):
    try:
        input_val = int(Variable.get("TEST_CALLBACK_INPUT", 0))
        if input_val % 2 == 0:
            raise ValueError("Invalid Input")
    except Exception as e:
        ti.xcom_push(key="error", value=str(e))
        raise e
    # pass

default_args = {
    "on_success_callback": None,
    "on_failure_callback": dag_failure_callback,
}

with DAG(
    dag_id="test_callbacks_dag",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=60),
    catchup=False,
) as dag:

    task1 = PythonOperator(task_id="task1", python_callable=user_func1)
    task2 = PythonOperator(task_id="task2", python_callable=user_func2)
    task3 = DummyOperator(task_id="task3", on_success_callback=task_success_callback)
    [task1, task2] >> task3
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

Airflow并行任务失败日志:

[2022-10-08, 00:10:51 IST] {logging_mixin.py:115} 信息 - DAG 失败,run_id:manual__2022-10-07T18:40:50.355282+00:00,任务 id:task1

[2022-10-08, 00:10:51 IST] {logging_mixin.py:115} 信息 - DAG 失败,run_id:manual__2022-10-07T18:40:50.355282+00:00,任务 id:task2

如上所述,task1和task2是并行任务。我使用回调函数来触发相应的 Kafka 消息。对于成功场景,它会在最终任务期间触发一条成功消息。问题出在任务失败期间,主要是当任务并行运行时。如果任务 1 和任务 2 在并行运行期间两个任务都失败,则气流会为任务 1 和任务 2 触发两个 on_failure_callback。我同意这应该是气流的行为。但对于我的要求,我不想触发多个on_failure_callback。当它触发第一个 on_failure_callback 时,它不应该触发下一个回调,因为接收方被设计为处理单个错误场景,而不是多个/批量错误。

我在on_failure_callback函数(dag_failure_callback)下编写了kafka消息调用函数,如果我的第一个task1失败,它会触发一条消息发送到kafka主题,同时如果task2也失败,它会触发第二条消息发送到同一个kafka主题,我无法处理它,因为两者既并行又独立运行。我想在第一个 kafka 发布该主题时停止,不想触发 kafka 消息以造成进一步的失败。

请建议,如何在并行任务失败期间限制 on_failure_callback 。

Dan*_*har 5

您可以使用trigger_rule+PythonOperator来处理失败的任务。这是一个例子:

import logging

import pendulum
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.python import PythonOperator
from airflow.utils.state import TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule

dag = DAG(
    dag_id='test',
    start_date=pendulum.today('UTC').add(hours=-1),
    schedule_interval=None,
)


def green_task(ti: TaskInstance, **kwargs):
    logging.info('green')


def red_task(ti: TaskInstance, **kwargs):
    raise Exception('red')


def check_tasks(ti: TaskInstance, **kwargs):
    # find failed tasks. do what you need...
    for task in ti.get_dagrun().get_task_instances(state=TaskInstanceState.FAILED):  # type: TaskInstance
        logging.info(f'failed dag: {task.dag_id}, task: {task.task_id}. url: {task.log_url}')


t1 = PythonOperator(
    dag=dag,
    task_id='green_task',
    python_callable=green_task,
    provide_context=True,
)


t2 = PythonOperator(
    dag=dag,
    task_id='red_task1',
    python_callable=red_task,
    provide_context=True,
)


t3 = PythonOperator(
    dag=dag,
    task_id='red_task2',
    python_callable=red_task,
    provide_context=True,
)


check = PythonOperator(
    dag=dag,
    task_id='check',
    python_callable=check_tasks,
    provide_context=True,
    trigger_rule=TriggerRule.NONE_SKIPPED,
)


t1 >> check
t2 >> check
t3 >> check
Run Code Online (Sandbox Code Playgroud)

运行任务并查看check任务日志:

[2022-10-10, 15:12:39 UTC] {dag_test.py:27} INFO - failed dag: test, task: red_task1. url: http://localhost:8080/log?execution_date=2022-10-10T14%3A49%3A57.530923%2B00%3A00&task_id=red_task1&dag_id=test&map_index=-1
[2022-10-10, 15:12:39 UTC] {dag_test.py:27} INFO - failed dag: test, task: red_task2. url: http://localhost:8080/log?execution_date=2022-10-10T14%3A49%3A57.530923%2B00%3A00&task_id=red_task2&dag_id=test&map_index=-1
Run Code Online (Sandbox Code Playgroud)

或者您可以将处理移至on_failure_callback

def on_failure_callback(context):
    ti = context['task_instance']  # type: TaskInstance
    for task in ti.get_dagrun().get_task_instances(state=TaskInstanceState.FAILED):  # type: TaskInstance
          # blablabla
Run Code Online (Sandbox Code Playgroud)