Airflow :使用分支跳过任务

Ash*_*iya 3 airflow

在我的 DAG 中想要跳过依赖于标志的任务 (oracle_merge_hist_orig)。

我的逻辑是:

当 oracle_branch=True 时执行 [merge_op,update_table_op,table_count_op]

当oracle_branch=False时执行[update_table_op, table_count_op]

我尝试按如下方式使用 BranchPythonOperator:

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
oracle_branch = True
def branch_func():
    if oracle_branch:
        return "oracle_branch"
    else:
        return "normal_branch"

dag = DAG(
    dag_id='example_branch_operator',
    default_args=args,
    schedule_interval="@daily",
)

branching_op = BranchPythonOperator(
    task_id='branch_shall_run_oracle_merge_original_hist',
    python_callable=branch_func,
    dag= dag)

oracle_branch = DummyOperator(
    task_id='oracle_branch',
    dag=dag)

normal_branch = DummyOperator(
    task_id='normal_branch',
    dag=dag)

merge_op = DummyOperator(
    task_id='oracle_merge_hist_orig',
    dag=dag,
)

update_table_op = DummyOperator(
    task_id='update_table_job',
    dag=dag,
)

table_count_op = DummyOperator(
    task_id='table_count',
    dag=dag,
)

branching_op >> [oracle_branch,normal_branch] 
normal_branch >> update_table_op >> table_count_op
oracle_branch >> merge_op >> update_table_op >> table_count_op
Run Code Online (Sandbox Code Playgroud)

但是,它不是跳过任务,而是跳过整个路径。

如何解决这个问题,以便我只跳过“racle_merge_hist_orig”任务?

当oracle_branch=False时 在此输入图像描述

当 oracle_branch=True 时 在此输入图像描述

Sai*_*tam 11

每个任务都会有一个默认trigger_rule设置。我们可以将其覆盖为此处all_success列出的不同值。

在您的 DAG 中,该update_table_job任务有两个上游任务。由于其上游任务之一处于skipped状态,因此它也进入skipped状态。trigger_rule我们可以通过覆盖默认值来避免这种情况,one_success如下所示。

update_table_op = DummyOperator(
    task_id='update_table_job',
    trigger_rule='one_success',
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述 注意:我在 Airflow 1.10.4 版本上对此进行了测试。