我正在使用SlackAPIPostOperatorAirflow在任务失败时发送Slack消息。我想知道是否存在一种将失败任务的气流UI日志页面的链接添加到松弛消息的聪明方法。
以下是我要实现的示例:
当前消息是:
def slack_failed_task(context):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']))
return failed_alert.execute(context=context)
Run Code Online (Sandbox Code Playgroud)
您可以使用base_url本[webserver]节下的config值来构建UI的url ,然后使用Slack的消息格式 <http://example.com|stuff>进行链接。
from airflow import configuration
def slack_failed_task(context):
link = '<{base_url}/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}|logs>'.format(
base_url=configuration.get('webserver', 'BASE_URL'),
dag_id=context['dag'].dag_id,
task_id=context['task_instance'].task_id,
execution_date=context['ts'])) # equal to context['execution_date'].isoformat())
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']) +
'\nSee ' + link + ' to debug')
return failed_alert.execute(context=context)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1404 次 |
| 最近记录: |