相关疑难解决方法(0)

气流失败的松弛消息

如何配置Airflow,以便DAG中的任何故障都会(立即)导致松弛消息?

此时我通过创建slack_failed_task来管理它:

slack_failed_task =  SlackAPIPostOperator(
    task_id='slack_failed',
    channel="#datalabs",
    trigger_rule='one_failed',
    token="...",
    text = ':red_circle: DAG Failed',
    icon_url = 'http://airbnb.io/img/projects/airflow3.png',
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

并在DAG中的每个其他任务的上游设置此任务(one_failed):

slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
Run Code Online (Sandbox Code Playgroud)

它有效,但它容易出错,因为忘记添加任务会跳过松弛通知,看起来很多工作.

是否有可能扩展email_on_failureDAG中的财产?

奖金;-)包括将失败任务的名称传递给消息的方法.

slack airflow apache-airflow

2
推荐指数
2
解决办法
6396
查看次数

气流-从dag上下文回调中解析任务ID

刚开始使用dag callbackon_failure_callbackon_success_callback)时,我认为完成后会触发successor fail状态dag(如dag中定义)。但是随后似乎每个实例都task instance没有实例化它dag run,因此,如果DAG具有N个任务,它将触发N次这些回调。

我正在尝试捕获任务ID,因此发送到松弛状态。在阅读另一个相关问题时,我想到了以下内容:

def success_msg(context):
    slack.slack_message(context['task_instance']); #send task-id to slack

def failure_msg(context):
    slack.slack_message(context['task_instance']); #send task-id to slack

default_args = {
    [...]
    'on_failure_callback': failure_msg,
    'on_success_callback': success_msg,
    [...]
}
Run Code Online (Sandbox Code Playgroud)

但是它失败了,我应该如何解析上下文变量并因此获得任务ID?

airflow

0
推荐指数
1
解决办法
3965
查看次数

标签 统计

airflow ×2

apache-airflow ×1

slack ×1