use*_*992 3 python apache-kafka airflow airflow-2.x
我想发布单个 Kafka 消息,以防气流并行任务失败。我的气流DAG与下面类似。
from datetime import datetime, timedelta
from airflow.models import Variable
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator
def task_failure_callback(context):
ti = context['task_instance']
print(f"task {ti.task_id } failed in dag { ti.dag_id }, error: {ti.xcom_pull(key='error')} ")
#call function to publish kafka message
def task_success_callback(context):
ti = context['task_instance']
print(f"Task {ti.task_id } has succeeded in dag { ti.dag_id }.")
#call function to publish kafka message
def dag_success_callback(context):
dag_status = f"DAG has succeeded, run_id: {context['run_id']}"
print(dag_status)
Variable.set("TEST_CALLBACK_DAG_STATUS", dag_status)
#call function to publish kafka message
def dag_failure_callback(context):
ti = context['task_instance']
dag_status = f"DAG has failed, run_id: {context['run_id']}, task id: {ti.task_id}"
print(dag_status)
Variable.set("TEST_CALLBACK_DAG_STATUS", dag_status)
#call function to publish kafka message
def user_func1(ti):
try:
input_val = int(Variable.get("TEST_CALLBACK_INPUT", 0))
if input_val % 10 == 0:
raise ValueError("Invalid Input")
except Exception as e:
ti.xcom_push(key="error", value=str(e))
raise e
def user_func2(ti):
try:
input_val = int(Variable.get("TEST_CALLBACK_INPUT", 0))
if input_val % 2 == 0:
raise ValueError("Invalid Input")
except Exception as e:
ti.xcom_push(key="error", value=str(e))
raise e
# pass
default_args = {
"on_success_callback": None,
"on_failure_callback": dag_failure_callback,
}
with DAG(
dag_id="test_callbacks_dag",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
catchup=False,
) as dag:
task1 = PythonOperator(task_id="task1", python_callable=user_func1)
task2 = PythonOperator(task_id="task2", python_callable=user_func2)
task3 = DummyOperator(task_id="task3", on_success_callback=task_success_callback)
[task1, task2] >> task3
Run Code Online (Sandbox Code Playgroud)
Airflow并行任务失败日志:
[2022-10-08, 00:10:51 IST] {logging_mixin.py:115} 信息 - DAG 失败,run_id:manual__2022-10-07T18:40:50.355282+00:00,任务 id:task1
[2022-10-08, 00:10:51 IST] {logging_mixin.py:115} 信息 - DAG 失败,run_id:manual__2022-10-07T18:40:50.355282+00:00,任务 id:task2
如上所述,task1和task2是并行任务。我使用回调函数来触发相应的 Kafka 消息。对于成功场景,它会在最终任务期间触发一条成功消息。问题出在任务失败期间,主要是当任务并行运行时。如果任务 1 和任务 2 在并行运行期间两个任务都失败,则气流会为任务 1 和任务 2 触发两个 on_failure_callback。我同意这应该是气流的行为。但对于我的要求,我不想触发多个on_failure_callback。当它触发第一个 on_failure_callback 时,它不应该触发下一个回调,因为接收方被设计为处理单个错误场景,而不是多个/批量错误。
我在on_failure_callback函数(dag_failure_callback)下编写了kafka消息调用函数,如果我的第一个task1失败,它会触发一条消息发送到kafka主题,同时如果task2也失败,它会触发第二条消息发送到同一个kafka主题,我无法处理它,因为两者既并行又独立运行。我想在第一个 kafka 发布该主题时停止,不想触发 kafka 消息以造成进一步的失败。
请建议,如何在并行任务失败期间限制 on_failure_callback 。
您可以使用trigger_rule+PythonOperator来处理失败的任务。这是一个例子:
import logging
import pendulum
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.python import PythonOperator
from airflow.utils.state import TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule
dag = DAG(
dag_id='test',
start_date=pendulum.today('UTC').add(hours=-1),
schedule_interval=None,
)
def green_task(ti: TaskInstance, **kwargs):
logging.info('green')
def red_task(ti: TaskInstance, **kwargs):
raise Exception('red')
def check_tasks(ti: TaskInstance, **kwargs):
# find failed tasks. do what you need...
for task in ti.get_dagrun().get_task_instances(state=TaskInstanceState.FAILED): # type: TaskInstance
logging.info(f'failed dag: {task.dag_id}, task: {task.task_id}. url: {task.log_url}')
t1 = PythonOperator(
dag=dag,
task_id='green_task',
python_callable=green_task,
provide_context=True,
)
t2 = PythonOperator(
dag=dag,
task_id='red_task1',
python_callable=red_task,
provide_context=True,
)
t3 = PythonOperator(
dag=dag,
task_id='red_task2',
python_callable=red_task,
provide_context=True,
)
check = PythonOperator(
dag=dag,
task_id='check',
python_callable=check_tasks,
provide_context=True,
trigger_rule=TriggerRule.NONE_SKIPPED,
)
t1 >> check
t2 >> check
t3 >> check
Run Code Online (Sandbox Code Playgroud)
运行任务并查看check任务日志:
[2022-10-10, 15:12:39 UTC] {dag_test.py:27} INFO - failed dag: test, task: red_task1. url: http://localhost:8080/log?execution_date=2022-10-10T14%3A49%3A57.530923%2B00%3A00&task_id=red_task1&dag_id=test&map_index=-1
[2022-10-10, 15:12:39 UTC] {dag_test.py:27} INFO - failed dag: test, task: red_task2. url: http://localhost:8080/log?execution_date=2022-10-10T14%3A49%3A57.530923%2B00%3A00&task_id=red_task2&dag_id=test&map_index=-1
Run Code Online (Sandbox Code Playgroud)
或者您可以将处理移至on_failure_callback:
def on_failure_callback(context):
ti = context['task_instance'] # type: TaskInstance
for task in ti.get_dagrun().get_task_instances(state=TaskInstanceState.FAILED): # type: TaskInstance
# blablabla
Run Code Online (Sandbox Code Playgroud)