Apache Airflow最佳实践:(Python)运算符或BashOperators

7 airflow apache-airflow

这些天我正在开发一个新的ETL项目,我想尝试一下Airflow作为工作经理.我和我的同事都是第一次使用Airflow,我们遵循两种不同的方法:我决定编写python函数(运算符,如apache-airflow项目中包含的那些),而我的同事使用气流来调用外部python脚本通过BashOperator.

我想知道是否有类似"良好实践"的东西,如果这两种方法同样好,或者我应该考虑另一种方法.

对我来说,主要区别在于: - 使用BashOperator,您可以使用特定包的特定python环境调用python脚本 - 使用BashOperator任务更独立,并且可以在气流发生时手动启动 - 使用BashOperator任务进行任务通信管理起来有点困难 - 使用BashOperator任务错误和故障更难管理(bash任务如何知道它之前的任务是否失败或成功?).

你怎么看?

Dan*_*ang 8

在这些情况下,我个人更喜欢使用 PythonOperator 而不是 BashOperator。这是我所做的以及为什么:

  • 包含我所有 DAG 的单个存储库。这个 repo 也有一个setup.py包含气流作为依赖项,以及我的 DAG 需要的任何其他东西。Airflow 服务从安装这些依赖项的 virtualenv 运行。这将处理您提到的有关 BashOperator 的 python 环境。
  • 我尝试将所有与 Airflow 无关的 Python 逻辑放在它自己的外部打包的 Python 库中。该代码应该有自己的单元测试,也有自己的主要内容,因此可以在独立于 Airflow 的命令行上调用它。这解决了您关于 Airflow 何时发疯的观点!
  • 如果逻辑足够小以至于无法分离到自己的库中,我会将它放在我的 DAG 存储库中的 utils 文件夹中,当然还有单元测试。
  • 然后我在 Airflow 中使用 PythonOperator 调用此逻辑。与 BashOperator 模板脚本不同,python 可调用文件可以轻松进行单元测试。这也意味着您可以访问诸如启动 Airflow DB 会话、将多个值推送到 XCom 等内容。
  • 就像您提到的那样,使用 Python 处理错误会更容易一些。您可以轻松捕获异常并检查返回值。您可以选择将任务标记为已跳过raise AirflowSkipException

仅供参考 BashOperator,如果脚本退出并显示错误代码,Airflow 会将任务标记为失败。

  • 如果您有 50 多个独立的工作流程,每个工作流程都需要独立的版本和库演变,我认为这种单一存储库单一环境实际上不起作用。 (5认同)