tsa*_*sch 9 python airflow google-cloud-composer
仅针对某些故障/异常重试 Airflow 运算符的最佳方法是什么?
例如,假设我有一个 Airflow 任务,该任务依赖于外部服务的可用性。如果该服务在任务执行期间变得不可用,我想稍后重试(最多重试 3 次)。对于其他失败我不想重试。
我当前的方法是通过解析来使用on_failure_callback和操作context["ti"].task.retries所需的异常context["exception"],但我认为这很混乱且难以理解。有更好的选择吗?
大多数airflow的操作符都使用一个Hook类来完成工作。
如果您可以创建自己的PythonOperator异常并尝试/捕获您想要避免的异常并抛出您想要触发重试的异常,它将无缝地符合气流架构:
# python operator function
def my_operation():
try:
hook = SomeHook()
hook.use_it()
except IgnorableException as e:
pass
# then:
my_operator = PythonOperator(
task_id='my-operator',
python_callable=my_operation
)
Run Code Online (Sandbox Code Playgroud)
它使您可以更好地控制 Operator 和 DAG 生命周期。