SlackAPIPostOperator中的气流UI链接?

Avi*_*ron 6 slack airflow

我正在使用SlackAPIPostOperatorAirflow在任务失败时发送Slack消息。我想知道是否存在一种将失败任务的气流UI日志页面的链接添加到松弛消息的聪明方法。

以下是我要实现的示例:

http://myserver-uw1.myaws.com:8080/admin/airflow/graph?execution_date=...&arrange=LR&root=&dag_id=MyDAG&_csrf_token=mytoken

当前消息是:

def slack_failed_task(context):
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']))
    return failed_alert.execute(context=context)
Run Code Online (Sandbox Code Playgroud)

Dan*_*ang 6

您可以使用base_url[webserver]节下的config值来构建UI的url ,然后使用Slack的消息格式 <http://example.com|stuff>进行链接。

from airflow import configuration

def slack_failed_task(context):
    link = '<{base_url}/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}|logs>'.format(
        base_url=configuration.get('webserver', 'BASE_URL'),
        dag_id=context['dag'].dag_id,
        task_id=context['task_instance'].task_id,
        execution_date=context['ts']))  # equal to context['execution_date'].isoformat())

    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']) +
             '\nSee ' + link + ' to debug')
    return failed_alert.execute(context=context)
Run Code Online (Sandbox Code Playgroud)