在我的 DAG 中,我的任务流程如下:
... >> EmrAddStepsOperator >> EmrStepSensor
Run Code Online (Sandbox Code Playgroud)
EmrAddStepsOperator 的成功意味着“我能够告诉 EMR 启动”。EmrStepSensor 失败意味着“EMR 任务出现问题”。我对这些描述可能有点偏差,但这与我想要表达的观点无关:
如果第二个任务失败,我想重新启动第一个任务,而不是第二个任务。当第二个任务失败时,如何让气流重新启动第一个任务?
on_retry_callback参数的解决方案编辑:我将此解决方案用于 Airflow 版本2.0.1。正如 @obayram 所说,activate_dag_runs参数 in在版本2.1.1clear_task_instances中已弃用。
您可以将clear_task_instances内置模块中的函数airflow.models.taskinstance与运算符中的参数结合起来,在当前任务失败on_retry_callback时重试最后n个任务。
您只需将以下 python 代码添加到 DAG 文件中即可:
from airflow.models.taskinstance import clear_task_instances
from airflow.utils.db import provide_session
@provide_session
def retry_upstream_tasks(context, session = None, adr = False):
task_ids_to_retry = []
j, a_task = 0, context['task']
while j < context['params']['retry_upstream_depth']:
num_upstream_tasks = len(a_task.upstream_task_ids)
if num_upstream_tasks != 1:
raise ValueError(f'The # of upstream tasks of "{a_task}" must be 1, but "{num_upstream_tasks}"')
upstream_task_id = list(a_task.upstream_task_ids)[0]
task_ids_to_retry.append(upstream_task_id)
upstream_task = [t for t in context['dag'].tasks if t.task_id == upstream_task_id][0]
a_task = upstream_task
j += 1
all_task_ids_to_instances = {t_ins.task_id: t_ins for t_ins in context['dag_run'].get_task_instances()}
task_instances_to_retry = [all_task_ids_to_instances[tid] for tid in task_ids_to_retry[::-1]]
clear_task_instances(tis = task_instances_to_retry, session = session, activate_dag_runs = adr, dag = context['dag'])
task_depends_on_previous_tasks = ANY_OPERATOR( # You can apply this to any operator.
task_id='task_depends_on_previous_tasks',
...
on_retry_callback=retry_upstream_tasks,
retries=3,
params={'retry_upstream_depth': 2} # You can change the depth
)
Run Code Online (Sandbox Code Playgroud)
{'retry_upstream_depth': n}将值传递给params任务操作员的参数。您可以更改n来控制在当前任务之前要重试的任务数。
您的任务顺序如下:
task_1 >> task_2 >> task_depends_on_previous_tasks
Run Code Online (Sandbox Code Playgroud)
并且您希望在失败task_1时按顺序重试。task_2task_depends_on_previous_tasks
那么,你应该设置retry_upstream_depth为2。
在这种情况下,将重试的任务(第一个/最旧的任务除外)应该只有一个上游任务,即这些任务应该在一条直线上。
重试次数受当前任务retries中的参数限制。因此,如果,当前任务最多可以失败 3 次,并且每次重试时,都会在触发当前任务之前触发前n 个任务。retries=3