是否有任何选项可自定义电子邮件并在DAG中发送任何任务失败。有一个类似'email_on_failure'的选项:True,但这不提供将内容动态添加到电子邮件主题或正文的选项。
我的DAG如下所示
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
import json
from datetime import timedelta
from datetime import datetime
from airflow.models import Variable
args = {
'owner': 'airflow',
'email': ['test@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'max_active_runs':10
}
dag = DAG(dag_id='TEST_DAG', default_args=args, schedule_interval='@once')
new_cluster = {
'spark_version': '4.0.x-scala2.11',
'node_type_id': 'Standard_D16s_v3',
'num_workers': 3,
'spark_conf':{
'spark.hadoop.javax.jdo.option.ConnectionDriverName':'org.postgresql.Driver',
.....
},
'custom_tags':{
'ApplicationName':'TEST',
.....
}
}
t1 = DatabricksSubmitRunOperator(
task_id='t1',
dag=dag,
new_cluster=new_cluster,
......
)
t2 = SimpleHttpOperator(
task_id='t2',
method='POST',
........
)
t2.set_upstream(t1)
t3 = SimpleHttpOperator(
task_id='t3',
method='POST',
.....
)
t3.set_upstream(t2)
send_mail = EmailOperator (
dag=dag,
task_id="send_mail",
to=["test@gmail.com"],
subject=" Success",
html_content='<h3>Success</h3>')
send_mail.set_upstream(t3)
Run Code Online (Sandbox Code Playgroud)
成功案例send_mail任务会将定制的电子邮件发送到指定的电子邮件ID。
但是如果万一任务失败,我想自定义电子邮件并发送到指定的电子邮件ID。但这不会发生,并且在失败的情况下,以默认主题和正文发送电子邮件
任何帮助,将不胜感激
我在 Airflow TriggerRule 的帮助下管理了它,下面给出了示例 DAG:-
\n\nimport airflow\n\nfrom airflow import DAG\nfrom airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator\nfrom airflow.operators.email_operator import EmailOperator\nfrom airflow.operators.bash_operator import BashOperator\nfrom airflow.operators.http_operator import SimpleHttpOperator\nfrom airflow.operators.sensors import HttpSensor\nimport json\nfrom datetime import timedelta\nfrom datetime import datetime\nfrom airflow.models import Variable\nfrom airflow.utils.trigger_rule import TriggerRule\n\nargs = {\n \'owner\': \'airflow\',\n \'email\': [\'test@gmail.com\'],\n \'email_on_failure\': True,\n \'email_on_retry\': True,\n \'depends_on_past\': False,\n \'start_date\': airflow.utils.dates.days_ago(0),\n \'max_active_runs\':10\n}\n\ndag = DAG(dag_id=\'TEST_DAG\', default_args=args, schedule_interval=\'@once\')\n\nnew_cluster = {\n \'spark_version\': \'4.0.x-scala2.11\',\n \'node_type_id\': \'Standard_D16s_v3\',\n \'num_workers\': 3,\n \'spark_conf\':{\n \'spark.hadoop.javax.jdo.option.ConnectionDriverName\':\'org.postgresql.Driver\',\n .....\n },\n \'custom_tags\':{\n \'ApplicationName\':\'TEST\',\n .....\n }\n}\n\nt1 = DatabricksSubmitRunOperator(\n task_id=\'t1\',\n dag=dag,\n new_cluster=new_cluster,\n ......\n)\n\nt2 = SimpleHttpOperator(\n task_id=\'t2\',\n trigger_rule=TriggerRule.ONE_SUCCESS,\n method=\'POST\',\n ........ \n)\n\nt2.set_upstream(t1)\n\nt3 = SimpleHttpOperator(\n task_id=\'t3\',\n trigger_rule=TriggerRule.ONE_SUCCESS,\n method=\'POST\',\n .....\n )\n\nt3.set_upstream(t2)\n\nAllTaskSuccess = EmailOperator (\n dag=dag,\n trigger_rule=TriggerRule.ALL_SUCCESS,\n task_id="AllTaskSuccess",\n to=["test@gmail.com"],\n subject="All Task completed successfully",\n html_content=\'<h3>All Task completed successfully" </h3>\')\n\nAllTaskSuccess.set_upstream([t1, t2,t3])\n\nt1Failed = EmailOperator (\n dag=dag,\n trigger_rule=TriggerRule.ONE_FAILED,\n task_id="t1Failed",\n to=["test@gmail.com"],\n subject="T1 Failed",\n html_content=\'<h3>T1 Failed</h3>\')\n\nt1Failed.set_upstream([t1])\n\nt2Failed = EmailOperator (\n dag=dag,\n trigger_rule=TriggerRule.ONE_FAILED,\n task_id="t2Failed",\n to=["test@gmail.com"],\n subject="T2 Failed",\n html_content=\'<h3>T2 Failed</h3>\')\n\nt2Failed.set_upstream([t2])\n\nt3Failed = EmailOperator (\n dag=dag,\n trigger_rule=TriggerRule.ONE_FAILED,\n task_id="t3Failed",\n to=["test@gmail.com"],\n subject="T3 Failed",\n html_content=\'<h3>T3 Failed</h3>\')\n\nt3Failed.set_upstream([t3])\nRun Code Online (Sandbox Code Playgroud)\n\n触发规则
\n\n虽然正常的工作流行为是在所有直接上游任务成功时触发任务,但 Airflow 允许更复杂的依赖关系设置。
\n\n所有运算符都有一个trigger_rule参数,它定义触发生成的任务的规则。trigger_rule 的默认值为 all_success,可以定义为 \xe2\x80\x9ct,当所有直接上游任务都成功\xe2\x80\x9d 时触发此任务。这里描述的所有其他规则都基于直接父任务,并且是在创建任务时可以传递给任何操作员的值:
\n\nall_success:(默认)所有父级都已成功
\n\nall_failed:所有父级都处于失败或upstream_failed状态
\n\nall_done:所有父母都已完成执行
\n\none_failed:至少一个父级失败后立即触发,它不会等待所有父级都完成
\n\none_success:至少一个父级成功后立即触发,它不会等待所有父级完成
\n\ndummy:依赖只是为了展示,随意触发
\n\n参考: https: //airflow.apache.org/concepts.html
\n我正在on_failure_callback为此。请注意,它将为DAG中的每个失败任务触发。
def report_failure(context):
# include this check if you only want to get one email per DAG
if(task_instance.xcom_pull(task_ids=None, dag_id=dag_id, key=dag_id) == True):
logging.info("Other failing task has been notified.")
send_email = EmailOperator(...)
send_email.execute(context)
'''
dag = DAG(
...,
default_args={
...,
"on_failure_callback": report_failure
}
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3853 次 |
| 最近记录: |