小编ABa*_*ron的帖子

在 BranchPython Operator 之后,Airflow 2.0 任务被跳过

我正在新版本中的 Airflow 中摆弄分支,无论我尝试什么,BranchOperator 之后的所有任务都会被跳过。

这是我一直在努力完成的一个最小的例子

from airflow.decorators import dag, task
from datetime import timedelta, datetime

from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

import logging
logger = logging.getLogger("airflow.task")

@dag(
    schedule_interval="0 0 * * *",
    start_date=datetime.today() - timedelta(days=2),
    dagrun_timeout=timedelta(minutes=60),
)
def StackOverflowExample():

    @task
    def task_A():

        logging.info("TASK A")
        

    @task
    def task_B():

        logging.info("TASK B")

    @task
    def task_C():

        logging.info("TASK C")

    @task
    def task_D():
        
        logging.info("TASK D")

        return {"parameter":0.5}

    
    def _choose_task(task_parameters,**kwargs):

        logging.info(task_parameters["parameter"])
        if task_parameters["parameter"]<0.5:
            logging.info("SUCCESSS ")
            return ['branch_1', 'task_final']
        else:
            logging.info("RIP")
            return ['branch_2', 'task_final']

    @task(task_id="branch_1")
    def …
Run Code Online (Sandbox Code Playgroud)

python branch etl directed-acyclic-graphs airflow

3
推荐指数
1
解决办法
2826
查看次数

标签 统计

airflow ×1

branch ×1

directed-acyclic-graphs ×1

etl ×1

python ×1