如何重新运行已经使用 TriggerDagrunoperator 执行的 dag?

Cdr*_*Cdr 3 python airflow airflow-scheduler

我有一个 dag,我正在使用以下运算符列表

  • TriggerDagrunoperator-触发另一个dag
  • ExternalTask​​Sensor-获取触发的dag的状态

我的用例:例如,如果整个流程成功完成,并且我发现中间的数据处理存在一些问题。我想从问题点开始针对特定执行日期重新运行作业。我清除了下游,这使得作业重新运行。但是,TriggerDagrunoperator 因以下问题而失败。

airflow.exceptions.DagRunAlreadyExists:运行 id 已触发_:dag id 已存在

我想清除这一点,并且需要在该特定执行日期再次重新运行 dag。有更好的方法来实现这一点吗?

Mar*_*rti 5

Airflow 2.0 带来了新版本的 TriggerDagRunOperator,允许重新运行过去的 DAGRun。现在比以前容易多了!参数reset_dag_runs正是您正在寻找的。
我制作了一个有关它的视频,如果可以帮助您的话https://youtu.be/8uKW0mPWmCk
祝你有美好的一天!