Maa*_*yan 7 directed-acyclic-graphs airflow airflow-scheduler
我想了解 Airflow 是否支持跳过 DAG 中的任务以进行临时执行?
假设我的 DAG 图如下所示:task1 > task2 > task3 > task4
我想从 task3 手动启动我的 DAG,这样做的最佳方法是什么?
我已经阅读了关于ShortCircuitOperator,但我正在寻找更多的临时解决方案,一旦触发执行就可以应用。
谢谢!
Ben*_*ory 11
您可以结合ShortCircuitOperator在后台使用的 SkipMixin来跳过下游任务。
from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults
class mySkippingOperator(BaseOperator, SkipMixin)
@apply_defaults
def __init__(self,
condition,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.condition = condition
def execute(self, context):
if self.condition:
self.log.info('Proceeding with downstream tasks...')
return
self.log.info('Skipping downstream tasks...')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
self.log.info("Done.")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
20230 次 |
| 最近记录: |