气流 on_failure_callback

Sou*_*hah 12 airflow

我有一个 Airflow DAG,有两个任务:

  • 读取csv
  • 进程文件

他们自己工作得很好。我故意在 pandas Dataframe 中创建了一个拼写错误,以了解on_failure_callback其工作原理并查看它是否被触发。从日志来看似乎没有:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1197, in handle_failure
    task.on_failure_callback(context)
TypeError: on_failure_callback() takes 0 positional arguments but 1 was given
Run Code Online (Sandbox Code Playgroud)

为什么不on_failure_callback工作?

以下是 DAG 的直观表示:

在此输入图像描述

这是代码:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1197, in handle_failure
    task.on_failure_callback(context)
TypeError: on_failure_callback() takes 0 positional arguments but 1 was given
Run Code Online (Sandbox Code Playgroud)

Ela*_*lad 18

您需要为您的函数指定一个可以接收上下文的参数,这是由于 Airflow 如何触发on_failure_callback

def on_failure_callback(context):
    print("Fail works  !  ")
Run Code Online (Sandbox Code Playgroud)

请注意,在您的实现中,您无法从消息中得知哪个任务失败了,因此您可能需要将任务详细信息添加到错误消息中,例如:

def on_failure_callback(context):
    ti = context['task_instance']
    print(f"task {ti.task_id } failed in dag { ti.dag_id } ")
Run Code Online (Sandbox Code Playgroud)