Apache Airflow:将任务延迟一段时间

Spa*_*ngh 8 python airflow

我试图在 DAG 内的父任务 5 分钟后执行任务。

DAG:任务 1 ----> 等待 5 分钟 ----> 任务 2

如何在 Apache Airflow 中实现这一点?提前致谢。

y2k*_*ham 11

上述行为可以通过引入一项任务来实现,该任务强制您Task 1Task 2


这可以使用 PythonOperator

import time
from airflow.operators.python_operator import PythonOperator

delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                                   dag=my_dag,
                                                   python_callable=lambda: time.sleep(300))

task_1 >> delay_python_task >> task_2
Run Code Online (Sandbox Code Playgroud)

或使用BashOperator以及

from airflow.operators.bash_operator import BashOperator
delay_bash_task: BashOperator = BashOperator(task_id="delay_bash_task",
                                             dag=my_dag,
                                             bash_command="sleep 5m")
task_1 >> delay_bash_task >> task_2
Run Code Online (Sandbox Code Playgroud)

注意:给定的代码片段未经测试


参考


更新-1

以下是引入延迟的其他一些方法

  • on_success_callback/ on_failure_callback:根据是否Task 2应该在 成功或失败时运行Task 1,您可以传入以下lambda: time.sleep(300)任一参数Task 1
  • pre_execute()/ post_execute(): 调用time.sleep(300)in Task 1'spost_execute()Task 2'spre_execute()也有同样的效果。当然,这将涉及为您的tasks(1 或 2)修改代码,因此最好避免它

我个人更喜欢额外的task方法,因为它使事情更加明确并且不会错误地夸大您Task 1Task 2

  • **@Spandan Singh** 我可以想到 2 种可能的解决方法 **[1]** 有一个*连续运行的 DAG*,它使用 `TriggerDagRunOperator` 在正确的时间触发其他 dag **[2]** 继续触发您的 dag经常,如果还没有到合适的时间,则使用“AirflowSkipException”或“ShortCircuitOperator”跳过执行 (2认同)
  • **@Deepak Tripathi** 我承认这是提议的解决方案中的一个漏洞。我有一段时间没有使用 Airflow,但也请查看[此线程](/sf/ask/4183000601/) (2认同)