如何配置Airflow,以便DAG中的任何故障都会(立即)导致松弛消息?
此时我通过创建slack_failed_task来管理它:
slack_failed_task = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
trigger_rule='one_failed',
token="...",
text = ':red_circle: DAG Failed',
icon_url = 'http://airbnb.io/img/projects/airflow3.png',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
并在DAG中的每个其他任务的上游设置此任务(one_failed):
slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
Run Code Online (Sandbox Code Playgroud)
它有效,但它容易出错,因为忘记添加任务会跳过松弛通知,看起来很多工作.
是否有可能扩展email_on_failureDAG中的财产?
奖金;-)包括将失败任务的名称传递给消息的方法.
刚开始使用dag callback(on_failure_callback和on_success_callback)时,我认为完成后会触发successor fail状态dag(如dag中定义)。但是随后似乎每个实例都task instance没有实例化它dag run,因此,如果DAG具有N个任务,它将触发N次这些回调。
我正在尝试捕获任务ID,因此发送到松弛状态。在阅读另一个相关问题时,我想到了以下内容:
def success_msg(context):
slack.slack_message(context['task_instance']); #send task-id to slack
def failure_msg(context):
slack.slack_message(context['task_instance']); #send task-id to slack
default_args = {
[...]
'on_failure_callback': failure_msg,
'on_success_callback': success_msg,
[...]
}
Run Code Online (Sandbox Code Playgroud)
但是它失败了,我应该如何解析上下文变量并因此获得任务ID?