小编Mar*_*k M的帖子

气流 - 回填的不同计划间隔

处理回填和持续运行的不同计划间隔的最佳方法是什么?

对于回填,我想使用每日间隔,但对于持续运行,我想使用每小时间隔。

我可以想到三种方法:

  1. 我看到的最简单的方法是在一个 .py 文件中定义两个 DAG。dag_backfill以每天为间隔,过去的开始日期和结束日期为datetime.now()dag_ongoing每小时间隔和开始日期为datetime.now(),在dag_backfill完成时接管。但是,这里不鼓励在一个文件中使用两个 DAG :

    我们确实支持每个 python 文件多个 DAG 定义,但不建议这样做,因为我们希望从故障和部署角度更好地隔离 DAG...

  2. 两个 .py 文件导入构成管道的相同 python 函数。我担心在这种方法中保持单独的文件一致。

  3. 只有一个每小时间隔的 DAG 检查运行日期是否超过过去 1 天,如果是,则仅在这些日期的午夜运行。我觉得这是不雅的,因为它会模糊回填将运行的时间表,至少从 gui 主页。

这个或已知的最佳实践有共同的模式吗?

python airflow

9
推荐指数
1
解决办法
352
查看次数

尝试下载文件时出现“无法获取”错误

使用files.download(). 在熊猫中创建数据框后,我的代码的结尾是:

#...other stuff that creates a ~50,000 row, ~8Mb CSV...
dataDf.to_csv('myFilename.csv', index=False)
files.download('myFilename.csv')
Run Code Online (Sandbox Code Playgroud)

运行这个我有时会得到错误 MessageError: TypeError: Failed to fetch

有时,再次重新运行单元会使下载工作,有时在失败的单元之后创建另一个单元并运行files.download('myFilename.csv')正常。

有谁知道如何阻止这种情况发生?或者知道为什么它只偶尔发生?

我想与非技术用户共享此文件,因此如果他们只需按第一个单元格侧面的播放按钮并自动下载文件,那将非常方便。

python google-colaboratory

5
推荐指数
0
解决办法
675
查看次数

导入airflow.DAG和airflow.models.DAG的区别

from airflow import DAG在文档中的教程中使用)和from airflow.models import DAG在示例 DAG 中使用)之间有什么区别吗?

我是 Airflow 的新手,不确定它们是否应该以不同的方式使用。

python airflow

5
推荐指数
1
解决办法
1701
查看次数

标签 统计

python ×3

airflow ×2

google-colaboratory ×1