nor*_*bjd 8 python-2.7 airflow
我有以下 3 个任务的 DAG:
start --> special_task --> end
Run Code Online (Sandbox Code Playgroud)
中间的任务可以成功也可以失败,但end 必须始终执行(想象一下这是一个干净地关闭资源的任务)。为此,我使用了触发规则 ALL_DONE:
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
Run Code Online (Sandbox Code Playgroud)
使用它,end如果special_task失败则正确执行。但是,由于end是最后一个任务并且成功,因此 DAG 始终标记为SUCCESS。
如何配置我的 DAG,以便如果其中一项任务失败,则整个 DAG 被标记为FAILED?
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils import trigger_rule
dag = DAG(
dag_id='my_dag',
start_date=datetime.datetime.today(),
schedule_interval=None
)
start = BashOperator(
task_id='start',
bash_command='echo start',
dag=dag
)
special_task = BashOperator(
task_id='special_task',
bash_command='exit 1', # force failure
dag=dag
)
end = BashOperator(
task_id='end',
bash_command='echo end',
dag=dag
)
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
start.set_downstream(special_task)
special_task.set_downstream(end)
Run Code Online (Sandbox Code Playgroud)
这篇文章似乎是相关的,但答案不适合我的需要,因为end必须执行下游任务(因此必须执行trigger_rule)。
正如@JustinasMarozas在评论中解释的那样,解决方案是创建一个虚拟任务,例如:
dummy = DummyOperator(
task_id='test',
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
并将其下游绑定到special_task:
failing_task.set_downstream(dummy)
Run Code Online (Sandbox Code Playgroud)
因此,DAG 被标记为失败,dummy任务被标记为upstream_failed。
希望有一个开箱即用的解决方案,但等待,这个解决方案就可以完成工作。
我认为这是一个有趣的问题,并花了一些时间弄清楚如何在没有额外虚拟任务的情况下实现它。它变得有点多余,但这是最终结果:
这是完整的 DAG:
import airflow
from airflow import AirflowException
from airflow.models import DAG, TaskInstance, BaseOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
default_args = {"owner": "airflow", "start_date": airflow.utils.dates.days_ago(3)}
dag = DAG(
dag_id="finally_task_set_end_state",
default_args=default_args,
schedule_interval="0 0 * * *",
description="Answer for question https://stackoverflow.com/questions/51728441",
)
start = BashOperator(task_id="start", bash_command="echo start", dag=dag)
failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag)
@provide_session
def _finally(task, execution_date, dag, session=None, **_):
upstream_task_instances = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id.in_(task.upstream_task_ids),
)
.all()
)
upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states
print("Do logic here...")
if fail_this_task:
raise AirflowException("Failing task because one or more upstream tasks failed.")
finally_ = PythonOperator(
task_id="finally",
python_callable=_finally,
trigger_rule=TriggerRule.ALL_DONE,
provide_context=True,
dag=dag,
)
succesful_task = DummyOperator(task_id="succesful_task", dag=dag)
start >> [failing_task, succesful_task] >> finally_
Run Code Online (Sandbox Code Playgroud)
查看_finally由 PythonOperator 调用的函数。这里有几个关键点:
@provide_session并添加参数session=None,以便您可以使用 查询 Airflow 数据库session。upstream_task_instances = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id.in_(task.upstream_task_ids),
)
.all()
)
Run Code Online (Sandbox Code Playgroud)
State.FAILED在那里:upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states
Run Code Online (Sandbox Code Playgroud)
print("Do logic here...")
Run Code Online (Sandbox Code Playgroud)
fail_this_task=True:if fail_this_task:
raise AirflowException("Failing task because one or more upstream tasks failed.")
Run Code Online (Sandbox Code Playgroud)
最终结果:
| 归档时间: |
|
| 查看次数: |
11977 次 |
| 最近记录: |