Airflow 将任务实例状态设置为以编程方式跳过

all*_*tej 1 airflow

我有循环创建任务的列表。该列表的大小是静态的。

        for counter, account_id in enumerate(ACCOUNT_LIST):
            task_id = f"bash_task_{counter}"
            if account_id:
                trigger_task = BashOperator(
                    task_id=task_id,
                    bash_command="echo hello there",
                    dag=dag)
            else:
                trigger_task = BashOperator(
                    task_id=task_id,
                    bash_command="echo hello there",
                    dag=dag)
                trigger_task.status = SKIPPED # is there way to somehow set status of this to skipped instead of having a branch operator?
            trigger_task

Run Code Online (Sandbox Code Playgroud)

我手动尝试过此操作,但无法跳过该任务:

        start = DummyOperator(task_id='start')
        task1 = DummyOperator(task_id='task_1')
        task2 = DummyOperator(task_id='task_2')
        task3 = DummyOperator(task_id='task_3')
        task4 = DummyOperator(task_id='task_4')

        start >> task1
        start >> task2

        try:
            start >> task3
            raise AirflowSkipException
        except AirflowSkipException as ase:
            log.error('Task Skipped for task3')
            
        try:
            start >> task4
            raise AirflowSkipException
        except AirflowSkipException as ase:
            log.error('Task Skipped for task4')
Run Code Online (Sandbox Code Playgroud)

小智 10

是的,你需要这样做raise AirflowSkipException

from airflow.exceptions import AirflowSkipException

raise AirflowSkipException
Run Code Online (Sandbox Code Playgroud)

更多信息请参见源代码