Fad*_*ura 5 airflow airflow-scheduler
假设我们有两个 DAG,dag1 和 dag2,它们服务于不同的业务需求。它们完全无关。但 dag1 更重要的是尽早完成它。
为简单起见,它们都只有一项任务并且每天运行。
在 dag1 落后于计划 2 或 3 天的情况下,我想确保 dag1 运行并首先完成其 dag_runs,即 dag1 在 dag2 能够继续之后是最新的。
我尝试过 priority_weight 但它不适用于不同的 dag。
我需要一种将来自两个不同 dag 的任务放在同一个队列中并实现 DAG 级别优先级的方法。
来自外部任务传感器的官方文档:
Waits for a different DAG or a task in a different DAG to complete for
a specific execution_date.
:param external_dag_id: The dag_id that contains the task you want to
wait for
:type external_dag_id: str
:param external_task_id: The task_id that contains the task you want to
wait for. If ``None`` the sensor waits for the DAG
:type external_task_id: str
:param allowed_states: list of allowed states, default is ``['success']``
:type allowed_states: list
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task or DAG.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
execution_delta or execution_date_fn can be passed to
ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
:param execution_date_fn: function that receives the current execution date
and returns the desired execution dates to query. Either execution_delta
or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable
:param check_existence: Set to `True` to check if the external task exists (when
external_task_id is not None) or check if the DAG to wait for exists (when
external_task_id is None), and immediately cease waiting if the external task
or DAG does not exist (default value: False).
:type check_existence: bool
Run Code Online (Sandbox Code Playgroud)
两个 DAG 都应将depends_on_past “触发规则”设置为True,以便只有在先前计划的运行成功完成的情况下才会执行较新计划的 DAG 运行。
然后在 Dag 2 的开头添加外部任务传感器(稍后执行的传感器)。
或者,您可以创建自己的自定义传感器并通过Airflow 插件使用它,以便检查元数据库以了解 Dag 运行的状态。
您还可以构建客户传感器,利用Airflow XCOM或Airflow 变量将执行运行时间或任何其他Airflow 宏传递到 DAG 2 中的传感器。
| 归档时间: |
|
| 查看次数: |
2409 次 |
| 最近记录: |