在气流中以不同的计划间隔设置外部任务传感器的正确方法是什么?

cod*_*rer 5 airflow

我刚刚加入一家新公司,我正在尝试学习 Airflow,因为我正在使用它。到目前为止,除了外部任务传感器之外,我已经掌握了大多数内容的基础知识。

我有两个 DAG,DAG A 的计划间隔为 ,"0 6 * * *"DAG B 的计划间隔"0 7 * * *"为 DAG A 等待 DAG B 完成后再继续。然而,DAG B 有时需要 3 小时才能完成,有时则需要 10 多个小时。

我创建了一个外部任务传感器,如下所示,但即使 DAG B 完成,它也不会触发和超时。

ExternalTaskSensor(
    task_id = "wait_sensor",
    external_dag_id="dag_b",
    external_task_id = "end",
    poke_interval = 60*30,
    timeout=60*60,
    retries = 10,
    execution_delta= timedelta(hours=2),
    dag=dag 
)
Run Code Online (Sandbox Code Playgroud)

如何正确设置传感器?

小智 5

DAG A 的执行日期比 DAG B 早一小时,并且您将执行增量设置为 2 小时,这意味着 DAG A 外部传感器正在尝试查找执行日期为 0 4 * * * 的 DAG B,而该执行日期不存在。在这种情况下,您的外部传感器任务会因超时而失败。您可以设置check_existence=True立即失败,而不是等待 10 次重试。此外,您还可以在外部任务传感器日志中查看其所在的外部任务执行日期:

[2022-12-02, 08:21:36 UTC] {external_task.py:206} INFO - Poking for tasks ['test_task'] in dag test_dag on 2022-12-02T08:25:00+00:00 ...

解决方案:

传感器文档中,“对于昨天,使用 [正!] datetime.timedelta(days=1)”

因此,要解决此问题,您需要提供 1 小时的负execution_delta,因为 DAG A 的执行日期恰好比 DAG B 晚 1 小时:

execution_delta=timedelta(hours=-1)