我刚刚加入一家新公司,我正在尝试学习 Airflow,因为我正在使用它。到目前为止,除了外部任务传感器之外,我已经掌握了大多数内容的基础知识。
我有两个 DAG,DAG A 的计划间隔为 ,"0 6 * * *"DAG B 的计划间隔"0 7 * * *"为 DAG A 等待 DAG B 完成后再继续。然而,DAG B 有时需要 3 小时才能完成,有时则需要 10 多个小时。
我创建了一个外部任务传感器,如下所示,但即使 DAG B 完成,它也不会触发和超时。
ExternalTaskSensor(
task_id = "wait_sensor",
external_dag_id="dag_b",
external_task_id = "end",
poke_interval = 60*30,
timeout=60*60,
retries = 10,
execution_delta= timedelta(hours=2),
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
如何正确设置传感器?
小智 5
DAG A 的执行日期比 DAG B 早一小时,并且您将执行增量设置为 2 小时,这意味着 DAG A 外部传感器正在尝试查找执行日期为 0 4 * * * 的 DAG B,而该执行日期不存在。在这种情况下,您的外部传感器任务会因超时而失败。您可以设置check_existence=True立即失败,而不是等待 10 次重试。此外,您还可以在外部任务传感器日志中查看其所在的外部任务执行日期:
[2022-12-02, 08:21:36 UTC] {external_task.py:206} INFO - Poking for tasks ['test_task'] in dag test_dag on 2022-12-02T08:25:00+00:00 ...
解决方案:
从传感器文档中,“对于昨天,使用 [正!] datetime.timedelta(days=1)”
因此,要解决此问题,您需要提供 1 小时的负execution_delta,因为 DAG A 的执行日期恰好比 DAG B 晚 1 小时:
execution_delta=timedelta(hours=-1)
| 归档时间: |
|
| 查看次数: |
10474 次 |
| 最近记录: |