气流 - 在任务之间锁定,以便一次只运行一个并行任务?

Kyl*_*ine 3 python-3.x airflow

我有一个DAG有三个任务流(licappts,agent,agentpolicy):

在此输入图像描述

为简单起见,我称这三个不同的流.流是独立的,因为agentpolicy失败并不意味着其他两个(liceappts和agent)应该受到其他流失败的影响.

但是对于sourceType _emr_task_1任务(即licappts_emr_task_1,agents_emr_task_1和agentpolicy_emr_task_1),我一次只能运行其中一个任务.例如,我不能同时运行agents_emr_task_1和agentpolicy_emr_task_1,即使它们是两个不一定关心的独立任务.

如何在Airflow中实现此功能?现在我唯一可以想到的是将该任务包装在一个以某种方式锁定全局变量的脚本中,然后如果该变量被锁定,我将让脚本执行Thread.sleep(60秒)或其他东西,然后重试.但这似乎非常hacky,我很好奇Airflow为此提供了解决方案.

如果需要实现这一点,我愿意重组我的DAG的订单.我想做的一件事是做一个硬编码的排序

Dag Starts -> ... -> licappts_emr_task_1 -> agents_emr_task_1 -> agentpolicy_emr_task_1 -> DAG Finished
Run Code Online (Sandbox Code Playgroud)

但我不认为以这种方式组合流,因为例如agentpolicy_emr_task_1必须等待其他两个完成才能启动,并且有时agentpolicy_emr_task_1准备好在其他两个完成其他任务之前完成.

理想情况下,我希望任何sourceType _emr_task_1任务首先启动它,然后阻止其他任务运行其sourceType _emr_task_1任务,直到它完成为止.

更新:

我刚才想到的另一个解决方案是,如果我有办法检查另一个任务的状态,我可以为sourceType _emr_task_1 创建一个脚本,检查其他两个sourceType _emr_task_1任务是否有任何运行状态,以及如果他们这样做,它会睡觉,并定期检查是否其他人都没有运行,在这种情况下,它将启动它的过程.我不是这种方式的忠实粉丝,因为我觉得它可能会导致竞争条件,其中两者都在读取(同时)没有运行并且都开始运行.

cwu*_*rtz 7

您可以使用来确保这些任务的并行性为1.

对于每个*_emr_task_1任务,将poolkwarg 设置为类似的东西pool=emr_task.

然后进入网络服务器 - > admin - > pools - > create:设置名称Pool以匹配运算符中使用的池,并将其Slots设置为1.

这将确保调度程序仅允许任务为该池排队,最多配置的插槽数,而不管其余Airflow的并行性.

  • airflow.apache.org/concepts.html#pools你是我的英雄.你和泰勒之间我欠天文学家很多:) (2认同)