She*_*esh 6 directed-acyclic-graphs airflow airflow-scheduler
让 \xe2\x80\x99s 说这是我的 dag:\nA >> B >> C
\n如果任务 B 引发异常,我想跳过该任务而不是失败。但是,我不想跳过任务 C。我研究了 AirflowSkipException 和 soft_fail 传感器,但它们也强制跳过下游任务。有人有办法让这个工作吗?
\n谢谢!
\n小智 8
目前发布的答案涉及不同的主题或似乎不完全正确。
向任务 C添加触发规则all_failed对于 OP 的示例 DAG 不起作用: A >> B >> C除非任务 A 以failed状态结束,这很可能是不可取的。
事实上,OP 非常接近,因为可以通过组合AirflowSkipException和none_failed触发规则来实现预期行为:
from datetime import datetime
from airflow.exceptions import AirflowSkipException
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
with DAG(
dag_id="mydag",
start_date=datetime(2022, 1, 18),
schedule_interval="@once"
) as dag:
def task_b():
raise AirflowSkipException
A = DummyOperator(task_id="A")
B = PythonOperator(task_id="B", python_callable=task_b)
C = DummyOperator(task_id="C", trigger_rule="none_failed")
A >> B >> C
Run Code Online (Sandbox Code Playgroud)
其中Airflow执行如下:
这条规则意味着什么?
触发规则
none_failed:所有上游任务均未失败或upstream_failed - 即所有上游任务均已成功或已被跳过
因此基本上我们可以捕获代码中的实际异常并引发提到的 Airflow 异常,该异常“强制”任务状态从failed变为skipped。然而,如果没有trigger_ruleTask-C 的参数,我们最终会将 Task-B 下游标记为skipped。
您可以更改trigger_rule任务声明中的 。
task = BashOperator(
task_id="task_C",
bash_command="echo hello world",
trigger_rule="all_done",
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
11406 次 |
| 最近记录: |