如何限制气流一次只运行1个DAG?

Ngu*_*Tín 4 python concurrency airflow

我希望DAG中的所有任务在下一次运行的第一个任务执行之前全部完成。

我有max_active_runs = 1,但是仍然发生。

default_args = {
'depends_on_past': True,
'wait_for_downstream': True,
'max_active_runs': 1,
'start_date': datetime(2018, 03, 04),
'owner': 'tin.nguyen',
'email': ['tin.nguyen@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=4)
}

dag = DAG('example', default_args=default_args, schedule_interval = schedule_interval)
Run Code Online (Sandbox Code Playgroud)

(我的所有任务都取决于上一个任务。气流版本为1.8.0)

谢谢

Ngu*_*Tín 12

我更改为将max_active_runs参数DAG()而不是放在default_arguments中,并且它起作用了。

感谢SimonD给我的想法,尽管没有在您的答案中直接指出。


Sim*_*onD 5

传递'max_active_runs': 1到DAG对象是实现此目的的方法。您应该如何设置它,因为我没有遇到这个问题(即使是1.7.1.3)也存在问题。

这是DAG的示例:

dag_args = {
    'owner': 'Owner',
    'depends_on_past': False,
    'start_date': datetime(2018, 01, 1, 12, 00),
    'email_on_failure': False
}

sched = timedelta(hours=1)
dag = DAG(job_id, default_args=dag_args, schedule_interval=sched, max_active_runs=1)
Run Code Online (Sandbox Code Playgroud)

如果您的dag正在运行的任务实际上是sub-dag,则您可能也需要传递max_active_runs到subdag,但是对此不是100%肯定的。