san*_*sar 2 python airflow airflow-taskflow
我在 Airflow 的 TaskFlowAPI 中找不到分支的文档。我尝试以“Pythonic”方式进行操作,但是当运行时,DAG 没有看到task_2_execute_if_true,无论前一个任务返回的真值如何。
@dag(
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['test'],
)
def my_dag():
@task()
def task_1_returns_boolean():
# evaluate and return boolean value
return boolean_value
@task()
def task_2_execute_if_true():
# do_something...
outcome_1 = task_1_returns_boolean()
if outcome_1:
outcome_2 = task_2_execute_if_true()
executed = my_dag()
Run Code Online (Sandbox Code Playgroud)
TaskFlowAPI 中正确的分支方式是什么?我应该再添加一个专门用于分支的函数吗?
语法是:
from airflow.decorators import task
@task.branch(task_id="branching_task_id")
def random_choice():
return "task_id_to_run"
Run Code Online (Sandbox Code Playgroud)
它是在 Airflow 2.3.0 中引入的。