GCP Apache Airflow - 如何从私有存储库安装 Python 包并在 DAG 上导入?

duy*_*rim 3 python google-cloud-platform airflow google-cloud-composer

我有一个私人存储库。这个存储库有我的 DAG 的常用功能。(例如:日期时间验证器、响应编码器函数)我想将此存储库的函数导入到我的 DAG 文件中,我使用此链接来执行此操作。

我创建了pip.conf文件。该文件的位置是:my-bucket-name/config/pip/pip.conf并且我在该文件中添加了我的私人 github 存储库,如下所示:

[global]
extra-index-url=https://<token>@github.com/my-private-github-repo.git
Run Code Online (Sandbox Code Playgroud)

之后,我想在我的 dag 文件上导入此存储库的函数(例如:from common-repo import *),但我在我的 DAG 上收到 “模块未找到”错误。(不幸的是,在云作曲家日志中,我看不到任何显示私人 github 存储库已安装的日志。)

我搜索了很多,但找不到如何做到这一点。

Iñi*_*lez 6

您可以将私有存储库添加到PythonVirtualenvOperator中的需求中,如下所示:

from airflow import DAG
from airflow.decorators import task

@task.virtualenv(
   task_id="virtualenv_python",
   requirements=["https://<token>@github.com/my-private-github-repo.git"],
                 system_site_packages=False
)

def callable_from_virtualenv():
   import your_private_module

   ..etc...


virtualenv_task = callable_from_virtualenv()
Run Code Online (Sandbox Code Playgroud)

(示例摘自Airflow python 操作符示例

为了避免在源代码中硬编码令牌/凭证,您可以使用 Airflow 变量,如下所示:

from airflow.models import Variable

@task.virtualenv(
   task_id="virtualenv_python",
   requirements=[Variable.get("private_github_repo")],
                 system_site_packages=False
)

Run Code Online (Sandbox Code Playgroud)

  • 这对我很重要,非常感谢! (2认同)