Zac*_*ack 2 amazon-web-services amazon-ses airflow
每当DAG中的任务运行失败或重试运行时,我都试图让Airflow使用AWS SES向我发送电子邮件。我也在使用我的AWS SES凭证,而不是我的一般AWS凭证。
我当前的airflow.cfg
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = myemail@myjob.com
Run Code Online (Sandbox Code Playgroud)
我的DAG中当前旨在故意失败并重试的任务:
testfaildag_library_install_jar_jdbc = PythonOperator(
task_id='library_install_jar',
retries=3,
retry_delay=timedelta(seconds=15),
python_callable=add_library_to_cluster,
params={'_task_id': 'cluster_create', '_cluster_name': CLUSTER_NAME, '_library_path':s3000://fakepath.jar},
dag=dag,
email_on_failure=True,
email_on_retry=True,
email=’myname@myjob.com’,
provide_context=True
)
Run Code Online (Sandbox Code Playgroud)
当任务重试设定的次数并最终失败时,一切都会按设计进行,除非没有发送电子邮件。我也检查了上述任务中的日志,并且从未提及过smtp。
我在这里看过类似的问题,但是那里唯一的解决方案对我不起作用。此外,Airflow的文档(例如此处的示例)似乎也不适合我。
SES是否可以与Airflow的email_on_failure和email_on_retry函数一起使用?
我目前正在考虑使用此on_failure_callback函数来调用AWS 此处提供的python脚本,以在发生故障时发送电子邮件,但这不是此时的首选方法。
谢谢您,谢谢您的帮助。
-使用有效的SES更新6/8
这是我写的有关如何使它们全部工作的文章。该答案的底部有一个小摘要。
几个要点:
email_on_failureand email_on_retry功能提供服务。您可以journalctl –u airflow-worker –f在Dag运行期间进行监视。在生产服务器上,airflow.cfg使用新的smtp设置进行更改后,您无需重新启动airflow- worker-它应该会自动提取。无需担心搞乱当前正在运行的Dags。这是有关如何使用sendmail的技术文章:
由于我们将localhost上的ses更改为sendmail,因此必须在中更改smtp设置airflow.cfg。
新的配置是:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from = myjob@mywork.com
Run Code Online (Sandbox Code Playgroud)
这适用于生产和局部气流实例。
如果它们的配置不像上面的我的配置可能会收到一些常见的错误:
socket.error: [Errno 111] Connection refused-您必须将smtp_host行更改airflow.cfg为localhostsmtplib.SMTPException: STARTTLS extension not supported by server.-您必须改变smtp_starttls在airflow.cfg以False在我的本地测试中,我试图简单地强制气流显示尝试发送电子邮件时发生的情况的日志–我创建了一个伪造的dag,如下所示:
# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
# General imports
from datetime import datetime,timedelta
def throwerror():
raise ValueError("Failure")
SPARK_V_2_2_1 = '3.5.x-scala2.11'
args = {
'owner': ‘me’,
'email': ['me@myjob'],
'depends_on_past': False,
'start_date': datetime(2018, 5,24),
'end_date':datetime(2018,6,28)
}
dag = DAG(
dag_id='testemaildag',
default_args=args,
catchup=False,
schedule_interval="* 18 * * *"
)
t1 = DummyOperator(
task_id='extract_data',
dag=dag
)
t2 = PythonOperator(
task_id='fail_task',
dag=dag,
python_callable=throwerror
)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)
如果您执行journalctl -u airflow-worker -f,您会看到工作人员说它已向DAG中的电子邮件发送了有关失败的警报电子邮件,但我们仍未收到该电子邮件。然后,我们决定通过执行以下操作来查看sendmail的邮件日志cat /var/log/maillog。我们看到了这样的日志:
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<myjob@mycompany.com>, size=1297, nrcpt=1 (queue active)
Jun 5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun 5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Run Code Online (Sandbox Code Playgroud)
因此,这可能是最大的“噢”时刻。在这里,我们可以看到smtp服务中实际发生的情况。我们使用telnet确认无法连接到gmail的目标IP范围。
我们确定电子邮件正在尝试发送,但是sendmail服务无法成功连接到ip范围。
我们决定允许AWS的端口25上的所有出站流量(因为我们的气流生产环境是ec2实例),现在它可以成功运行。现在,我们可以接收有关失败和重试的电子邮件(提示:email_on_failure并且email_on_retry默认情况下,如TrueDAG API参考中所述 -如果您不想这样做,则无需将其放入args中,但是明确声明仍然是一种好习惯正确或错误)。
SES现在可以使用。这是气流配置:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = myemail@myjob.com (Verified SES email)
Run Code Online (Sandbox Code Playgroud)
谢谢!
| 归档时间: |
|
| 查看次数: |
2514 次 |
| 最近记录: |