我试图通过传入一个thisshouldnotrun不起作用的Bash行()来故意使一个Airflow任务失败并输出错误.气流输出如下:
[2017-06-15 17:44:17,869] {bash_operator.py:94} INFO - /tmp/airflowtmpLFTMX7/run_bashm2MEsS: line 7: thisshouldnotrun: command not found
[2017-06-15 17:44:17,869] {bash_operator.py:97} INFO - Command exited with return code 127
[2017-06-15 17:44:17,869] {models.py:1417} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2017-06-15 17:44:17,871] {models.py:1433} INFO - Marking task as UP_FOR_RETRY
[2017-06-15 17:44:17,878] {models.py:1462} ERROR - Bash command failed
Traceback …Run Code Online (Sandbox Code Playgroud) 如何配置Airflow,以便DAG中的任何故障都会(立即)导致松弛消息?
此时我通过创建slack_failed_task来管理它:
slack_failed_task = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
trigger_rule='one_failed',
token="...",
text = ':red_circle: DAG Failed',
icon_url = 'http://airbnb.io/img/projects/airflow3.png',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
并在DAG中的每个其他任务的上游设置此任务(one_failed):
slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
Run Code Online (Sandbox Code Playgroud)
它有效,但它容易出错,因为忘记添加任务会跳过松弛通知,看起来很多工作.
是否有可能扩展email_on_failureDAG中的财产?
奖金;-)包括将失败任务的名称传递给消息的方法.