在气流中,出现故障时,有没有办法重复一组任务?

Dan*_*lan 7 python airflow

在我的 DAG 中,我的任务流程如下:

... >> EmrAddStepsOperator >> EmrStepSensor
Run Code Online (Sandbox Code Playgroud)

EmrAddStepsOperator 的成功意味着“我能够告诉 EMR 启动”。EmrStepSensor 失败意味着“EMR 任务出现问题”。我对这些描述可能有点偏差,但这与我想要表达的观点无关:

如果第二个任务失败,我想重新启动第一个任务,而不是第二个任务。当第二个任务失败时,如何让气流重新启动第一个任务?

Ber*_*dan 5

基于on_retry_callback参数的解决方案

编辑:我将此解决方案用于 Airflow 版本2.0.1。正如 @obayram 所说,activate_dag_runs参数 in在版本2.1.1clear_task_instances中已弃用。

您可以将clear_task_instances内置模块中的函数airflow.models.taskinstance与运算符中的参数结合起来,在当前任务失败on_retry_callback时重试最后n个任务

您只需将以下 python 代码添加到 DAG 文件中即可:

from airflow.models.taskinstance import clear_task_instances
from airflow.utils.db import provide_session

@provide_session
def retry_upstream_tasks(context, session = None, adr = False):
    task_ids_to_retry = []
    j, a_task = 0, context['task']
    while j < context['params']['retry_upstream_depth']:
        num_upstream_tasks = len(a_task.upstream_task_ids)
        if num_upstream_tasks != 1:
            raise ValueError(f'The # of upstream tasks of "{a_task}" must be 1, but "{num_upstream_tasks}"')
        upstream_task_id = list(a_task.upstream_task_ids)[0]
        task_ids_to_retry.append(upstream_task_id)
        upstream_task = [t for t in context['dag'].tasks if t.task_id == upstream_task_id][0]
        a_task = upstream_task
        j += 1

    all_task_ids_to_instances = {t_ins.task_id: t_ins for t_ins in context['dag_run'].get_task_instances()}
    task_instances_to_retry = [all_task_ids_to_instances[tid] for tid in task_ids_to_retry[::-1]]

    clear_task_instances(tis = task_instances_to_retry, session = session, activate_dag_runs = adr, dag = context['dag'])

task_depends_on_previous_tasks = ANY_OPERATOR( # You can apply this to any operator.
            task_id='task_depends_on_previous_tasks',
            ...
            on_retry_callback=retry_upstream_tasks,
            retries=3,
            params={'retry_upstream_depth': 2} # You can change the depth
        )
Run Code Online (Sandbox Code Playgroud)

{'retry_upstream_depth': n}将值传递给params任务操作员的参数。您可以更改n来控制在当前任务之前要重试的任务数。

例如

您的任务顺序如下:

task_1 >> task_2 >> task_depends_on_previous_tasks
Run Code Online (Sandbox Code Playgroud)

并且您希望在失败task_1时按顺序重试。task_2task_depends_on_previous_tasks

那么,你应该设置retry_upstream_depth2

重要笔记

  • 在这种情况下,将重试的任务(第一个/最旧的任务除外)应该只有一个上游任务,即这些任务应该在一条直线上。

  • 重试次数受当前任务retries中的参数限制。因此,如果,当前任务最多可以失败 3 次,并且每次重试时,都会在触发当前任务之前触发前n 个任务retries=3

  • 这个解决方案对我有用。然而,从 Airflow 2.1.1 开始,“clear_task_instances”函数的“active_dag_run”参数已被弃用,因此我省略了该参数。 (2认同)

小智 1

我想您可以轻松地将两个运算符放入 subdag 运算符中,并在 subdag 运算符上设置重试参数。