有没有办法为同一 DAG 中的任务配置不同的“重试”

Alb*_*bin 3 airflow

我有一个包含许多子任务的 DAG。在 DAG 的中间,有一个验证任务,根据任务的结果/返回代码,我想采用两条不同的路径。如果成功,将遵循一条路线(一系列任务),如果失败,我们将执行一组不同的任务。当前方法有两个问题,一是如果退出代码为 1,验证任务将执行多次(根据配置的重试次数)。二是无法采取不同的执行分支

为了解决问题 1,我们可以使用任务实例中可用的重试次数,它可以通过宏 {{ task_instance }} 获得。感谢有人可以为我们指出更简洁的方法,而采取不同路径的问题 2 仍未解决。

kax*_*xil 9

您可以retries在任务级别拥有。

run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo 1',
    retries=3,
    dag=dag,
)

run_this_last = DummyOperator(
    task_id='run_this_last',
    retries=1,
    dag=dag,
)
Run Code Online (Sandbox Code Playgroud)

关于你的第二个问题,有一个Branching的概念。

在此处输入图片说明

BranchPythonOperator很像PythonOperator只是它需要一个python_callable一个返回TASK_ID(或task_ids的列表)。遵循返回的 task_id,并跳过所有其他路径。Python 函数返回的 task_id 必须直接引用任务下游的BranchPythonOperator任务。

示例 DAG:

import random

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='example_branch_operator',
    default_args=args,
    schedule_interval="@daily",
)

run_this_first = DummyOperator(
    task_id='run_this_first',
    dag=dag,
)

options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=lambda: random.choice(options),
    dag=dag,
)
run_this_first >> branching

join = DummyOperator(
    task_id='join',
    trigger_rule='one_success',
    dag=dag,
)

for option in options:
    t = DummyOperator(
        task_id=option,
        dag=dag,
    )

    dummy_follow = DummyOperator(
        task_id='follow_' + option,
        dag=dag,
    )

    branching >> t >> dummy_follow >> join
Run Code Online (Sandbox Code Playgroud)


Tom*_*mme 1

关于您的第一个问题,您可以很轻松地设置任务/操作员特定的重试选项。参考:baseoperator.py#L77

问题二,您可以使用 DAG 轻松分支BranchPythonOperator(示例用法:example_branch_operator.py)。您将希望将验证任务/逻辑嵌套在BranchPythonOperator(您可以在运算符中定义和执行运算符)。