如何在Airflow中创建条件任务

Ale*_*and 43 python conditional-statements airflow

我想在Airflow中创建一个条件任务,如下面的架构中所述.预期的情况如下:

  • 任务1执行
  • 如果任务1成功,则执行任务2a
  • 否则,如果任务1失败,则执行任务2b
  • 最后执行任务3

条件任务 上面的所有任务都是SSHExecuteOperator.我猜我应该使用ShortCircuitOperator和/或XCom来管理这个条件,但我不知道如何实现它.你能描述一下解决方案吗?

vil*_*asv 47

Airflow有一个BranchPythonOperator,可以用来更直接地表达分支依赖.

文档描述了它的用法:

BranchPythonOperator与PythonOperator非常相似,只是它需要一个返回task_id的python_callable.返回返回的task_id,并跳过所有其他路径.Python函数返回的task_id必须直接引用BranchPythonOperator任务下游的任务.

...

如果你想跳过一些任务,请记住你不能有一个空路径,如果是这样,那就做一个虚拟任务.

代码示例

def dummy_test():
    return 'branch_a'

A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)

branch_task = BranchPythonOperator(
    task_id='branching',
    python_callable=dummy_test,
    dag=dag,
)

branch_task >> A_task 
branch_task >> B_task
Run Code Online (Sandbox Code Playgroud)

编辑:

如果您在1.10.1 版本之后(但不包括)安装Airflow版本,您还可以返回任务ID列表,允许您在单个Operator中跳过多个下游路径:

  • @mr4kino 哎呀看起来它被推迟到 1.10.3,我发表评论太早了 ;-) 将更新答案,谢谢。 (2认同)

Jea*_*n S 39

您必须使用气流触发规则

所有运算符都有一个trigger_rule参数,该参数定义生成的任务被触发的规则.

触发规则的可能性:

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
Run Code Online (Sandbox Code Playgroud)

以下是解决问题的想法:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)

task_1 = SSHExecuteOperator(
        task_id='task_1',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2 = SSHExecuteOperator(
        task_id='conditional_task',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2a = SSHExecuteOperator(
        task_id='task_2a',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2b = SSHExecuteOperator(
        task_id='task_2b',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_FAILED,
        ssh_hook=sshHook,
        dag=dag)

task_3 = SSHExecuteOperator(
        task_id='task_3',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)


task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
Run Code Online (Sandbox Code Playgroud)

  • 从此处的 Airflow 文档 [链接](https://airflow.incubator.apache.org/concepts.html#trigger-rules) 我确认“one_success:只要 ** 至少一个父级** 成功,** 就会触发,**它不会等待所有父母都完成**”......我会尝试使用 ALL_DONE!谢谢 (3认同)
  • 失败似乎有点过于宽泛.任务可能由于各种原因(例如网络或DNS问题)而失败,然后触发错误的下游任务.有没有办法用两种不同的下游选项定义两种或更多种不同类型的成功?例如文件存在做一个,文件不存在吗?文件传感器似乎不是正确的答案,因为在所有重试之后,失败可能是出于其他原因. (3认同)
  • 嗨!你试图改变:trigger_rule = TriggerRule.ONE_SUCCESS by Trig_rule = TriggerRule.ALL_DONE在TASK 3中?您确定您的任务同时执行吗?(尝试在T2A中设置睡眠功能以进行健全性检查) (2认同)
  • 对于寻找新触发规则文档 (Airflow 2.1+) 的其他人,您可以在这里找到它:[触发规则](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html #触发规则) (2认同)