仅在某些异常时重试 Airflow 任务实例

tsa*_*sch 9 python airflow google-cloud-composer

仅针对某些故障/异常重试 Airflow 运算符的最佳方法是什么?

例如,假设我有一个 Airflow 任务,该任务依赖于外部服务的可用性。如果该服务在任务执行期间变得不可用,我想稍后重试(最多重试 3 次)。对于其他失败我不想重试。

我当前的方法是通过解析来使用on_failure_callback和操作context["ti"].task.retries所需的异常context["exception"],但我认为这很混乱且难以理解。有更好的选择吗?

Ala*_*ato 3

大多数airflow的操作符都使用一个Hook类来完成工作。

如果您可以创建自己的PythonOperator异常并尝试/捕获您想要避免的异常并抛出您想要触发重试的异常,它将无缝地符合气流架构:

# python operator function
def my_operation():
    try:
        hook = SomeHook()
        hook.use_it()
    except IgnorableException as e:
        pass


# then:
my_operator = PythonOperator(
    task_id='my-operator',
    python_callable=my_operation
)
Run Code Online (Sandbox Code Playgroud)

它使您可以更好地控制 Operator 和 DAG 生命周期。