Airflow DAG:任何任务失败时的自定义电子邮件

San*_*ndy 2 airflow

是否有任何选项可自定义电子邮件并在DAG中发送任何任务失败。有一个类似'email_on_failure'的选项:True,但这不提供将内容动态添加到电子邮件主题或正文的选项。

我的DAG如下所示

import airflow

from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
import json
from datetime import timedelta
from datetime import datetime
from airflow.models import Variable

args = {
    'owner': 'airflow',
    'email': ['test@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'max_active_runs':10
}

dag = DAG(dag_id='TEST_DAG', default_args=args, schedule_interval='@once')

new_cluster = {
    'spark_version': '4.0.x-scala2.11',
    'node_type_id': 'Standard_D16s_v3',
    'num_workers': 3,
    'spark_conf':{
        'spark.hadoop.javax.jdo.option.ConnectionDriverName':'org.postgresql.Driver',
        .....
    },
    'custom_tags':{
        'ApplicationName':'TEST',
        .....
    }
}

t1 = DatabricksSubmitRunOperator(
  task_id='t1',
  dag=dag,
  new_cluster=new_cluster,
  ......
)

t2 = SimpleHttpOperator(
    task_id='t2',
    method='POST',
    ........    
)

t2.set_upstream(t1)

t3 = SimpleHttpOperator(
    task_id='t3',
    method='POST',
   .....
 )

t3.set_upstream(t2)

send_mail = EmailOperator (
    dag=dag,
    task_id="send_mail",
    to=["test@gmail.com"],
    subject=" Success",
    html_content='<h3>Success</h3>')

send_mail.set_upstream(t3)
Run Code Online (Sandbox Code Playgroud)

成功案例send_mail任务会将定制的电子邮件发送到指定的电子邮件ID。

但是如果万一任务失败,我想自定义电子邮件并发送到指定的电子邮件ID。但这不会发生,并且在失败的情况下,以默认主题和正文发送电子邮件

任何帮助,将不胜感激

San*_*ndy 6

我在 Airflow TriggerRule 的帮助下管理了它,下面给出了示例 DAG:-

\n\n
import airflow\n\nfrom airflow import DAG\nfrom airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator\nfrom airflow.operators.email_operator import EmailOperator\nfrom airflow.operators.bash_operator import BashOperator\nfrom airflow.operators.http_operator import SimpleHttpOperator\nfrom airflow.operators.sensors import HttpSensor\nimport json\nfrom datetime import timedelta\nfrom datetime import datetime\nfrom airflow.models import Variable\nfrom airflow.utils.trigger_rule import TriggerRule\n\nargs = {\n    \'owner\': \'airflow\',\n    \'email\': [\'test@gmail.com\'],\n    \'email_on_failure\': True,\n    \'email_on_retry\': True,\n    \'depends_on_past\': False,\n    \'start_date\': airflow.utils.dates.days_ago(0),\n    \'max_active_runs\':10\n}\n\ndag = DAG(dag_id=\'TEST_DAG\', default_args=args, schedule_interval=\'@once\')\n\nnew_cluster = {\n    \'spark_version\': \'4.0.x-scala2.11\',\n    \'node_type_id\': \'Standard_D16s_v3\',\n    \'num_workers\': 3,\n    \'spark_conf\':{\n        \'spark.hadoop.javax.jdo.option.ConnectionDriverName\':\'org.postgresql.Driver\',\n        .....\n    },\n    \'custom_tags\':{\n        \'ApplicationName\':\'TEST\',\n        .....\n    }\n}\n\nt1 = DatabricksSubmitRunOperator(\n  task_id=\'t1\',\n  dag=dag,\n  new_cluster=new_cluster,\n  ......\n)\n\nt2 = SimpleHttpOperator(\n    task_id=\'t2\',\n    trigger_rule=TriggerRule.ONE_SUCCESS,\n    method=\'POST\',\n    ........    \n)\n\nt2.set_upstream(t1)\n\nt3 = SimpleHttpOperator(\n    task_id=\'t3\',\n    trigger_rule=TriggerRule.ONE_SUCCESS,\n    method=\'POST\',\n   .....\n )\n\nt3.set_upstream(t2)\n\nAllTaskSuccess = EmailOperator (\n    dag=dag,\n    trigger_rule=TriggerRule.ALL_SUCCESS,\n    task_id="AllTaskSuccess",\n    to=["test@gmail.com"],\n    subject="All Task completed successfully",\n    html_content=\'<h3>All Task completed successfully" </h3>\')\n\nAllTaskSuccess.set_upstream([t1, t2,t3])\n\nt1Failed = EmailOperator (\n    dag=dag,\n    trigger_rule=TriggerRule.ONE_FAILED,\n    task_id="t1Failed",\n    to=["test@gmail.com"],\n    subject="T1 Failed",\n    html_content=\'<h3>T1 Failed</h3>\')\n\nt1Failed.set_upstream([t1])\n\nt2Failed = EmailOperator (\n    dag=dag,\n    trigger_rule=TriggerRule.ONE_FAILED,\n    task_id="t2Failed",\n    to=["test@gmail.com"],\n    subject="T2 Failed",\n    html_content=\'<h3>T2 Failed</h3>\')\n\nt2Failed.set_upstream([t2])\n\nt3Failed = EmailOperator (\n    dag=dag,\n    trigger_rule=TriggerRule.ONE_FAILED,\n    task_id="t3Failed",\n    to=["test@gmail.com"],\n    subject="T3 Failed",\n    html_content=\'<h3>T3 Failed</h3>\')\n\nt3Failed.set_upstream([t3])\n
Run Code Online (Sandbox Code Playgroud)\n\n

触发规则

\n\n

虽然正常的工作流行为是在所有直接上游任务成功时触发任务,但 Airflow 允许更复杂的依赖关系设置。

\n\n

所有运算符都有一个trigger_rule参数,它定义触发生成的任务的规则。trigger_rule 的默认值为 all_success,可以定义为 \xe2\x80\x9ct,当所有直接上游任务都成功\xe2\x80\x9d 时触发此任务。这里描述的所有其他规则都基于直接父任务,并且是在创建任务时可以传递给任何操作员的值:

\n\n

all_success:(默认)所有父级都已成功

\n\n

all_failed:所有父级都处于失败或upstream_failed状态

\n\n

all_done:所有父母都已完成执行

\n\n

one_failed:至少一个父级失败后立即触发,它不会等待所有父级都完成

\n\n

one_success:至少一个父级成功后立即触发,它不会等待所有父级完成

\n\n

dummy:依赖只是为了展示,随意触发

\n\n

参考: https: //airflow.apache.org/concepts.html

\n

  • 我不建议走这条路。 (3认同)

Jus*_*zas 5

我正在on_failure_callback为此。请注意,它将为DAG中的每个失败任务触发。

def report_failure(context):
    # include this check if you only want to get one email per DAG
    if(task_instance.xcom_pull(task_ids=None, dag_id=dag_id, key=dag_id) == True):
        logging.info("Other failing task has been notified.")
    send_email = EmailOperator(...)
    send_email.execute(context)

'''

dag = DAG(
    ...,
    default_args={
        ...,
        "on_failure_callback": report_failure
    }
)
Run Code Online (Sandbox Code Playgroud)

  • 气流会将任务信息作为上下文的一部分传递给on_failure_callback。 (2认同)