Ngu*_*Tín 4 python concurrency airflow
我希望DAG中的所有任务在下一次运行的第一个任务执行之前全部完成。
我有max_active_runs = 1,但是这仍然发生。
default_args = {
'depends_on_past': True,
'wait_for_downstream': True,
'max_active_runs': 1,
'start_date': datetime(2018, 03, 04),
'owner': 'tin.nguyen',
'email': ['tin.nguyen@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=4)
}
dag = DAG('example', default_args=default_args, schedule_interval = schedule_interval)
Run Code Online (Sandbox Code Playgroud)
(我的所有任务都取决于上一个任务。气流版本为1.8.0)
谢谢
Ngu*_*Tín 12
我更改为将max_active_runs
参数DAG()
而不是放在default_arguments中,并且它起作用了。
感谢SimonD给我的想法,尽管没有在您的答案中直接指出。
传递'max_active_runs': 1
到DAG对象是实现此目的的方法。您应该如何设置它,因为我没有遇到这个问题(即使是1.7.1.3)也存在问题。
这是DAG的示例:
dag_args = {
'owner': 'Owner',
'depends_on_past': False,
'start_date': datetime(2018, 01, 1, 12, 00),
'email_on_failure': False
}
sched = timedelta(hours=1)
dag = DAG(job_id, default_args=dag_args, schedule_interval=sched, max_active_runs=1)
Run Code Online (Sandbox Code Playgroud)
如果您的dag正在运行的任务实际上是sub-dag,则您可能也需要传递max_active_runs
到subdag,但是对此不是100%肯定的。