我有一个工作流程,其中有两个并行进程 (sentinel_run和sentinel_skip),它们应该根据条件运行或跳过,然后连接在一起 ( resolve)。我需要直接位于任一sentinel_任务下游的任务进行级联跳过,但是当它到达该resolve任务时,resolve应该运行,除非上游任一进程出现故障。
根据文档,“none_failed”触发规则应该有效:
none_failed:所有父级都没有失败(失败或upstream_failed),即所有父级都已成功或被跳过
这也是对相关问题的回答。
然而,当我实现一个简单的例子时,我看到的并不是这样:
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.utils.dates import days_ago
dag = DAG(
"testing",
catchup=False,
schedule_interval="30 12 * * *",
default_args={
"owner": "test@gmail.com",
"start_date": days_ago(1),
"catchup": False,
"retries": 0
}
)
start = DummyOperator(task_id="start", dag=dag)
sentinel_run = ShortCircuitOperator(task_id="sentinel_run", dag=dag, python_callable=lambda: True)
sentinel_skip = ShortCircuitOperator(task_id="sentinel_skip", dag=dag, python_callable=lambda: False)
a = DummyOperator(task_id="a", dag=dag)
b …Run Code Online (Sandbox Code Playgroud)