处理回填和持续运行的不同计划间隔的最佳方法是什么?
对于回填,我想使用每日间隔,但对于持续运行,我想使用每小时间隔。
我可以想到三种方法:
我看到的最简单的方法是在一个 .py 文件中定义两个 DAG。dag_backfill以每天为间隔,过去的开始日期和结束日期为datetime.now(),dag_ongoing每小时间隔和开始日期为datetime.now(),在dag_backfill完成时接管。但是,这里不鼓励在一个文件中使用两个 DAG :
我们确实支持每个 python 文件多个 DAG 定义,但不建议这样做,因为我们希望从故障和部署角度更好地隔离 DAG...
两个 .py 文件导入构成管道的相同 python 函数。我担心在这种方法中保持单独的文件一致。
只有一个每小时间隔的 DAG 检查运行日期是否超过过去 1 天,如果是,则仅在这些日期的午夜运行。我觉得这是不雅的,因为它会模糊回填将运行的时间表,至少从 gui 主页。
这个或已知的最佳实践有共同的模式吗?
使用files.download(). 在熊猫中创建数据框后,我的代码的结尾是:
#...other stuff that creates a ~50,000 row, ~8Mb CSV...
dataDf.to_csv('myFilename.csv', index=False)
files.download('myFilename.csv')
Run Code Online (Sandbox Code Playgroud)
运行这个我有时会得到错误 MessageError: TypeError: Failed to fetch
有时,再次重新运行单元会使下载工作,有时在失败的单元之后创建另一个单元并运行files.download('myFilename.csv')正常。
有谁知道如何阻止这种情况发生?或者知道为什么它只偶尔发生?
我想与非技术用户共享此文件,因此如果他们只需按第一个单元格侧面的播放按钮并自动下载文件,那将非常方便。
from airflow import DAG(在文档中的教程中使用)和from airflow.models import DAG(在示例 DAG 中使用)之间有什么区别吗?
我是 Airflow 的新手,不确定它们是否应该以不同的方式使用。