气流:如何将读取 json 文件的方法放入本地库中

Ste*_* G. 3 python json google-cloud-platform airflow

我必须产生一些DAG。我已将 json 表架构文件保存在GCP 存储桶上。\n与Composer关联的 GCP 存储桶上的文件将在/home/airflow/gcs/dags/上重新映射。\n如果我定义读取 json 文件的方法,创建 dag,一切都会顺利。\n但是,如果我希望生成一些“通用代码”(用于将其放在我的库中),我无法使用库中的代码访问文件系统,在具体来说我无法使用 python json 库。

\n\n

奇怪的是,我在 dag 创建步骤之外定义了该方法,但只有在 dag 创建之后才调用它!

\n\n

为了完成讨论,如果库中的代码仅在内存对象中使用,我不会有任何问题。

\n\n

当我使用气流时遇到这个问题(作曲家在 GCP 驱动程序上使用 1.9)

\n\n

这是我的外部库:

\n\n
lib/\n    __init__.py\n    bb_airflow_utils.py\n
Run Code Online (Sandbox Code Playgroud)\n\n

在外部库上

\n\n
lib/\n    __init__.py\n    bb_airflow_utils.py\n
Run Code Online (Sandbox Code Playgroud)\n\n

在主要脚本上

\n\n
def load_json_file(fname):\n    #per far s\xc3\xac che il dag la veda\n    with open(fname, \'r\') as f:\n        d = json.load(f)\n    return d\n
Run Code Online (Sandbox Code Playgroud)\n\n

Airflow 忽略操作员并通过 UI 表示 dag 没有 SLA

\n

kax*_*xil 5

查看https://cloud.google.com/composer/docs/how-to/using/installing-python-dependency#install-local

您可以将常用代码放在单独的文件中,然后将其放在单独的文件夹中,如下例所示。

将依赖项放置在文件夹的子目录中dags/。要从子目录导入模块,模块路径中的每个子目录都必须包含__init__.py包标记文件。

在此示例中,依赖项是 coin_module.py:

dags/
  use_local_deps.py  # A DAG file.
  dependencies/
    __init__.py
    coin_module.py
Run Code Online (Sandbox Code Playgroud)

从 DAG 定义文件导入依赖项。

例如:

from dependencies import coin_module
Run Code Online (Sandbox Code Playgroud)