我一直在评估气流.我有这个用例,我有一个工作流程,每小时运行一次,以获得每小时的数据聚合.和另一个每天运行以获得相同的每日聚合.是否有可能创建一个组合工作流,只有在过去一天所有小时聚合都成功时,每日聚合才会运行?我已经看到你可以创建子dag但两个dag可以以不同的频率运行吗?如果是的如何?
不确定您希望它如何工作,但虽然没有直接的方法来做到这一点,但您可以通过几种方法使用广泛的气流运算符套件来构建这样的 dag。
例如,您可以创建每小时 dags depend_on_past,然后使用 python 分支运算符使日聚合任务/dag 在当天最后一次运行的每小时 dag 结束时运行/触发。查看PythonBranchOperator和TriggerDagRunOperator。
您还可以为每日聚合器创建自己的传感器,以确保当天的所有每小时数据都成功。查看一下ExternalTaskSensor以供参考。