我将 Apache Airflow 版本 1.10.3 与顺序执行器一起使用,如果 DAG 尚未完成,我希望 DAG 在一段时间后失败。我尝试dagrun_timeout
在示例代码中设置
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2019, 6, 1),
'retries': 0,
}
dag = DAG('min_timeout', default_args=default_args, schedule_interval=timedelta(minutes=5), dagrun_timeout = timedelta(seconds=30), max_active_runs=1)
t1 = BashOperator(
task_id='fast_task',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='slow_task',
bash_command='sleep 45',
dag=dag)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)
slow_task
单独花费的时间超过了设置的时间限制dagrun_timeout
,所以我的理解是气流应该停止 DAG 执行。但是,这不会发生,并且允许 slow_task 在其整个持续时间内运行。发生这种情况后,运行将被标记为失败,但这不会根据需要终止任务或 DAG。使用execution_timeout
forslow_task
确实会导致任务在指定的时间限制内被终止,但我更愿意为 DAG 使用总体时间限制,而不是execution_timeout
为每个任务指定。
还有什么我应该尝试实现这种行为的,或者我可以修复的任何错误?
归档时间: |
|
查看次数: |
566 次 |
最近记录: |