气流失败的松弛消息

Tom*_*ous 2 slack airflow apache-airflow

如何配置Airflow,以便DAG中的任何故障都会(立即)导致松弛消息?

此时我通过创建slack_failed_task来管理它:

slack_failed_task =  SlackAPIPostOperator(
    task_id='slack_failed',
    channel="#datalabs",
    trigger_rule='one_failed',
    token="...",
    text = ':red_circle: DAG Failed',
    icon_url = 'http://airbnb.io/img/projects/airflow3.png',
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

并在DAG中的每个其他任务的上游设置此任务(one_failed):

slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
Run Code Online (Sandbox Code Playgroud)

它有效,但它容易出错,因为忘记添加任务会跳过松弛通知,看起来很多工作.

是否有可能扩展email_on_failureDAG中的财产?

奖金;-)包括将失败任务的名称传递给消息的方法.

小智 15

也许这个例子会有所帮助:

def slack_failed_task(contextDictionary, **kwargs):  
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
Run Code Online (Sandbox Code Playgroud)


Dee*_*mal 5

尝试Airflow 版本 >=1.10.0 中的新SlackWebhookOperator

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slack_msg="Hi Wssup?"

slack_test =  SlackWebhookOperator(
    task_id='slack_test',
    http_conn_id='slack_connection',
    webhook_token='/1234/abcd',
    message=slack_msg,
    channel='#airflow_updates',
    username='airflow_'+os.environ['ENVIRONMENT'],
    icon_emoji=None,
    link_names=False,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

注意:确保您已将slack_connectionAirflow 连接添加为

host=https://hooks.slack.com/services/
Run Code Online (Sandbox Code Playgroud)

  • 没问题。它可能对其他人有帮助:) (3认同)