Apache Airflow 不强制执行 dagrun_timeout

Rac*_*ger 5 airflow

我将 Apache Airflow 版本 1.10.3 与顺序执行器一起使用,如果 DAG 尚未完成,我希望 DAG 在一段时间后失败。我尝试dagrun_timeout在示例代码中设置

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 1),
    'retries': 0,
}

dag = DAG('min_timeout', default_args=default_args, schedule_interval=timedelta(minutes=5), dagrun_timeout = timedelta(seconds=30), max_active_runs=1)

t1 = BashOperator(
    task_id='fast_task',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='slow_task',
    bash_command='sleep 45',
    dag=dag)

t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

slow_task单独花费的时间超过了设置的时间限制dagrun_timeout,所以我的理解是气流应该停止 DAG 执行。但是,这不会发生,并且允许 slow_task 在其整个持续时间内运行。发生这种情况后,运行将被标记为失败,但这不会根据需要终止任务或 DAG。使用execution_timeoutforslow_task确实会导致任务在指定的时间限制内被终止,但我更愿意为 DAG 使用总体时间限制,而不是execution_timeout为每个任务指定。

还有什么我应该尝试实现这种行为的,或者我可以修复的任何错误?

Olu*_*ule 4

Airflow 调度程序至少每隔一次运行一次循环SCHEDULER_HEARTBEAT_SEC(默认为5 秒)。

请记住at least这里,因为调度程序执行的一些操作可能会延迟其循环的下一个周期。

这些行动包括:

  • 解析 dags
  • 填充 DagBag
  • 检查 DagRun 并更新其状态
  • 安排下一个 DagRun

在您的示例中,延迟任务不会终止,dagrun_timeout因为调度程序在任务完成后执行下一个周期。