如何强制气流任务失败?

Mas*_*ask 19 python airflow

我有一个python可调process_csv_entries用来处理csv文件条目.只有成功处理完所有条目后,我才希望我的任务成功完成.否则任务应该失败

def process_csv_entries(csv_file):
    # Boolean 
    file_completely_parsed = <call_to_module_to_parse_csv>
    return not file_completely_parsed

CSV_FILE=<Sets path to csv file>
t1 = PythonOperator(dag=dag,
                      task_id='parse_csv_completely',
                      python_operator=process_csv_entries,
                      op_args=[CSV_FILE])
Run Code Online (Sandbox Code Playgroud)

无论返回值如何,t1似乎都成功完成.如何强制PythonOperator任务失败?

off*_*dal 29

如果您想让任务失败而不重试,请使用AirflowFailException:-

例子 :-

from airflow.exceptions import AirflowFailException
def task_to_fail():
    raise AirflowFailException("Our api key is bad!")
Run Code Online (Sandbox Code Playgroud)

如果您正在寻找重试,请使用AirflowException:-

例子:-

from airflow import AirflowException
def task_to_fail():
    raise AirflowException("Error msj")
Run Code Online (Sandbox Code Playgroud)


Pri*_*hta 23

遇到错误条件时引发异常(在您的情况下:文件未被解析时)

raise ValueError('File not parsed completely/correctly')
Run Code Online (Sandbox Code Playgroud)

用适当的消息提出相关的错误类型

  • 这不会强制任务实例状态"失败"但是...有没有办法绕过重试配置? (6认同)
  • 这工作......谢谢!我希望有更好的方法来解决这个问题. (3认同)

Alo*_*nik 14

是的,加注AirflowException,这将导致任务立即移动到失败状态.

from airflow import AirflowException

ValueError 可用于失败和重试.

  • 如果设置了重试,是否可以防止气流重试任务?例如,有一些您不想/不需要重试的错误,例如与输入无效的错误。 (6认同)
  • 任何异常都会使任务失败并将其转入失败状态。 (2认同)
  • @JoeJ,有一个为此开放的 PR:https://github.com/apache/airflow/pull/7133 (2认同)

y2k*_*ham 14

AirflowFailException现在可用于使任务失败而无需重试

  • 对于那些想知道“现在”版本意味着什么的人来说,它是[在 1.10.11 中引入的](https://airflow.apache.org/docs/apache-airflow/stable/changelog.html#airflow-1-10- 11-2020-07-10) (2认同)