tsa*_*sch 5 python airflow google-cloud-composer
我正在运行composer-1.16.6-airflow-1.10.15。
对于每日计划的 DAG,我想编写一个自定义on_failure_notification,仅在任务实例连续多天失败时发送通知。我的计划是获取 dag 运行的失败任务实例并检查每个最后成功执行日期:
def my_on_failure_notification(context):
failed_tis = context["dag_run"].get_task_instances(state=State.FAILED)
tis_to_notify_about = [ti.task_id for ti in failed_tis if ti.previous_execution_date_success < days_ago(2)]
Run Code Online (Sandbox Code Playgroud)
此操作失败并出现以下跟踪:
[...]
File "/home/airflow/gcs/dags/xxx.py", line 94, in my_on_failure_notification
ti.task_id for ti in failed_tis if ti.previous_execution_date_success < days_ago(2)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 625, in previous_execution_date_success
prev_ti = self._get_previous_ti(state=State.SUCCESS)
File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 582, in _get_previous_ti
dag = self.task.dag
AttributeError: 'TaskInstance' object has no attribute 'task'
Run Code Online (Sandbox Code Playgroud)
我认为发生这种情况是因为 TI 是作为 SQLAlchemy 模型检索的,该模型不包含该task属性。这是有意的行为吗?有建议的替代方案吗?