Tom*_*ous 2 slack airflow apache-airflow
如何配置Airflow,以便DAG中的任何故障都会(立即)导致松弛消息?
此时我通过创建slack_failed_task来管理它:
slack_failed_task = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
trigger_rule='one_failed',
token="...",
text = ':red_circle: DAG Failed',
icon_url = 'http://airbnb.io/img/projects/airflow3.png',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
并在DAG中的每个其他任务的上游设置此任务(one_failed):
slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
Run Code Online (Sandbox Code Playgroud)
它有效,但它容易出错,因为忘记添加任务会跳过松弛通知,看起来很多工作.
是否有可能扩展email_on_failure
DAG中的财产?
奖金;-)包括将失败任务的名称传递给消息的方法.
小智 15
也许这个例子会有所帮助:
def slack_failed_task(contextDictionary, **kwargs):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
token="...",
text = ':red_circle: DAG Failed',
owner = '_owner',)
return failed_alert.execute
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
尝试Airflow 版本 >=1.10.0 中的新SlackWebhookOperator
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
slack_msg="Hi Wssup?"
slack_test = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack_connection',
webhook_token='/1234/abcd',
message=slack_msg,
channel='#airflow_updates',
username='airflow_'+os.environ['ENVIRONMENT'],
icon_emoji=None,
link_names=False,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
注意:确保您已将slack_connection
Airflow 连接添加为
host=https://hooks.slack.com/services/
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6396 次 |
最近记录: |