使用 TaskFlowAPI 在 Apache Airflow 中进行分支

san*_*sar 2 python airflow airflow-taskflow

我在 Airflow 的 TaskFlowAPI 中找不到分支的文档。我尝试以“Pythonic”方式进行操作,但是当运行时,DAG 没有看到task_2_execute_if_true,无论前一个任务返回的真值如何。

@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['test'],
)
def my_dag():
    @task()
    def task_1_returns_boolean():
        # evaluate and return boolean value
        return boolean_value
    
    @task()
    def task_2_execute_if_true():
        # do_something...

    outcome_1 = task_1_returns_boolean()
    if outcome_1:
        outcome_2 = task_2_execute_if_true() 


executed = my_dag()
Run Code Online (Sandbox Code Playgroud)

TaskFlowAPI 中正确的分支方式是什么?我应该再添加一个专门用于分支的函数吗?

Bas*_*lak 5

源代码中有一个示例 DAG:https://github.com/apache/airflow/blob/f1a9a9e3727443ffba496de9b9650322fdc98c5f/airflow/example_dags/example_branch_operator_decorator.py#L43

语法是:

from airflow.decorators import task

@task.branch(task_id="branching_task_id")
def random_choice():
    return "task_id_to_run"
Run Code Online (Sandbox Code Playgroud)

它是在 Airflow 2.3.0 中引入的。

  • 由于某种原因我没有意识到这一点,并且我广泛使用了 TaskFlow API。是否有所有 @task 方法的列表? (2认同)