如何跳过 Airflow 上的任务?

Maa*_*yan 7 directed-acyclic-graphs airflow airflow-scheduler

我想了解 Airflow 是否支持跳过 DAG 中的任务以进行临时执行?

假设我的 DAG 图如下所示:task1 > task2 > task3 > task4

我想从 task3 手动启动我的 DAG,这样做的最佳方法是什么?

我已经阅读了关于ShortCircuitOperator,但我正在寻找更多的临时解决方案,一旦触发执行就可以应用。

谢谢!

Ben*_*ory 11

您可以结合ShortCircuitOperator在后台使用的 SkipMixin来跳过下游任务。

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)
    
    @apply_defaults
    def __init__(self,
                 condition,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.condition = condition
    
    def execute(self, context):

        if self.condition:
           self.log.info('Proceeding with downstream tasks...')
           return

        self.log.info('Skipping downstream tasks...')

        downstream_tasks = context['task'].get_flat_relatives(upstream=False)
       
        self.log.debug("Downstream task_ids %s", downstream_tasks)

        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

        self.log.info("Done.")
Run Code Online (Sandbox Code Playgroud)