我有一个 Airflow DAG,有两个任务:
他们自己工作得很好。我故意在 pandas Dataframe 中创建了一个拼写错误,以了解on_failure_callback其工作原理并查看它是否被触发。从日志来看似乎没有:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1197, in handle_failure
task.on_failure_callback(context)
TypeError: on_failure_callback() takes 0 positional arguments but 1 was given
Run Code Online (Sandbox Code Playgroud)
为什么不on_failure_callback工作?
以下是 DAG 的直观表示:
这是代码:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1197, in handle_failure
task.on_failure_callback(context)
TypeError: on_failure_callback() takes 0 positional arguments but 1 was given
Run Code Online (Sandbox Code Playgroud)
Ela*_*lad 18
您需要为您的函数指定一个可以接收上下文的参数,这是由于 Airflow 如何触发on_failure_callback
def on_failure_callback(context):
print("Fail works ! ")
Run Code Online (Sandbox Code Playgroud)
请注意,在您的实现中,您无法从消息中得知哪个任务失败了,因此您可能需要将任务详细信息添加到错误消息中,例如:
def on_failure_callback(context):
ti = context['task_instance']
print(f"task {ti.task_id } failed in dag { ti.dag_id } ")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
31288 次 |
| 最近记录: |