每个数据的气流达格伦,而不是预定的

Seb*_*ian 5 airflow prefect

我目前面临的问题是我在 MongoDB 集合中有文档,每个文档都需要由需要在非循环依赖图中运行的任务进行处理和更新。如果上游任务无法处理文档,则任何依赖任务都无法处理该文档,因为该文档尚未使用先决条件信息进行更新。

如果我要使用 Airflow,这给我留下了两个解决方案:

  1. 为每个文档触发一个 DAG,并传入带有--conf. 问题在于这不是 Airflow 的预期使用方式。我永远不会运行预定的流程,并且根据文档在集合中的显示方式,我每天会制作 1440 个 Dagruns。

  2. 每个时期运行一个 DAG 以处理该时期在集合中创建的所有文档。这遵循 Airflow 的工作方式,但问题是如果任务无法处理单个文档,则任何依赖任务都无法处理任何其他文档。此外,如果某个文档需要比其他文档更长的时间来处理任务,则这些其他文档将等待该单个文档继续沿 DAG 执行。

有没有比 Airflow 更好的方法?或者在 Airflow 中是否有比我目前看到的两种方法更好的方法来处理这个问题?

Tag*_*gar 0

您可以trigger_rule从“all_success”更改为“all_done”

https://github.com/apache/airflow/blob/62b21d747582d9d2b7cdcc34a326a8a060e2a8dd/airflow/example_dags/example_latest_only_with_trigger.py#L40

并且还可以创建一个分支来处理失败的文档,并将其trigger_rule设置为“one_failed”,以某种方式以不同的方式移动处理这些失败的文档(例如移动到“失败”文件夹并发送通知)