Airflow调度程序是否有可能在开始下一个循环之前首先完成前一天的循环?

use*_*930 11 python apache scheduler python-3.x airflow

现在,我的DAG中的节点在该DAG的其余节点完成之前进入第二天的任务.有没有办法等待DAG的其余部分完成,然后才能进入第二天的DAG循环?

(我确实将depends_on_previous视为真,但在这种情况下不起作用)

我的DAG看起来像这样:

               O
               l
               V
O -> O -> O -> O -> O
Run Code Online (Sandbox Code Playgroud)

另外,dag的树视图]

树视图pic的dag

Ole*_*min 14

对于这个答案可能有点晚了,但我遇到了同样的问题,我解决它的方式是我在每个dag中添加了两个额外的任务.开头的"上一个"和结尾的"完成".以前的任务是监视先前作业的外部任务传感器.完成只是一个虚拟操作员.让我们说它每30分钟运行一次,所以dag看起来像这样:

dag = DAG(dag_id='TEST_DAG', default_args=default_args, schedule_interval=timedelta(minutes=30))

PREVIOUS = ExternalTaskSensor(
    task_id='Previous_Run',
    external_dag_id='TEST_DAG',
    external_task_id='All_Tasks_Completed',
    allowed_states=['success'],
    execution_delta=timedelta(minutes=30),
    dag=DAG
)

T1 = BashOperator(
    task_id='TASK_01',
    bash_command='echo "Hello World from Task 1"',
    dag=dag
)

COMPLETE = DummyOperator(
    task_id='All_Tasks_Completed',
    dag=DAG
)

PREVIOUS >> T1 >> COMPLETE
Run Code Online (Sandbox Code Playgroud)

所以下一个dag,即使它将进入队列,它也不会让任务在PREVIOUS完成之前运行.

  • 为了避免对其他人有帮助,我遇到了一些问题-1)确保您未使用单线程的`SequentialExecutor`(默认设置)。这将导致您的第二个Sensor在您的第一个实际任务之前运行,并且您将处于任务处于排队状态的死锁状态。2)您可能要在开头添加`LatestOnlyOperator`来跳过回填3)如果这样做,则需要更新`allowed_states`以包含`“ skipped”`状态,4)如果不这样做您需要手动将第一个任务标记为成功,以摆脱最初的死锁情况。 (3认同)

小智 6

最终为我工作的是以下的组合

  1. 添加任务依赖项:wait_for_downstream=True、depends_on_past=True
  2. 创建 dag 时添加 max_active_runs:1。我确实尝试添加 max_active_runs 作为默认参数,但这不起作用。


use*_*545 4

如果您只想一次运行一个实例,请尝试设置 max_active_runs=1

  • @nono我同意乔布的观点。换句话说,通过设置 max_active_runs=1,不能保证前一个 DAG 实例成功完成。 (4认同)
  • @nono如果我猜(我没有否决它)那是因为这个答案不强制执行排序b (2认同)