如何在 Airflow 中使用配置文件

Tsu*_*ume 5 configuration airflow

在 Airflow 中,我们创建了几个 DAGS。其中一些共享公共属性,例如从中读取文件的目录。目前,这些属性被列为每个单独的 DAG 中的一个属性,这在未来显然会成为问题。假设目录名称要更改,我们必须进入每个 DAG 并更新这段代码(甚至可能丢失一个)。

我正在研究创建某种配置文件,该文件可以解析为 Airflow 并在需要某个属性时由各种 DAGS 使用,但我似乎找不到任何有关如何执行此操作的文档或指南。我能找到的大部分是关于设置连接 ID 的文档,但这不符合我的用例。

我的帖子的问题是,是否可以执行上述场景以及如何执行?

提前致谢。

Vir*_*ekh 5

根据您的设置,您可以通过几种方法来完成此操作:

  • 您可以使用 DagFactory 类型的方法,让函数生成 DAG。您可以在这里找到一个示例

  • 您可以将 JSON 配置存储为Airflow Variable,并对其进行解析以生成 DAG。您可以在管理 -> 变量中存储类似的内容:


[
  {
    "table": "users",
    "schema": "app_one",
    "s3_bucket": "etl_bucket",
    "s3_key": "app_one_users",
    "redshift_conn_id": "postgres_default"
  },
  {
    "table": "users",
    "schema": "app_two",
    "s3_bucket": "etl_bucket",
    "s3_key": "app_two_users",
    "redshift_conn_id": "postgres_default"
  }
]

Run Code Online (Sandbox Code Playgroud)

您的 DAG 可能会生成为:

sync_config = json.loads(Variable.get("sync_config"))

with dag:
    start = DummyOperator(task_id='begin_dag')
    for table in sync_config:
        d1 = RedshiftToS3Transfer(
            task_id='{0}'.format(table['s3_key']),
            table=table['table'],
            schema=table['schema'],
            s3_bucket=table['s3_bucket'],
            s3_key=table['s3_key'],
            redshift_conn_id=table['redshift_conn_id']
        )
        start >> d1
Run Code Online (Sandbox Code Playgroud)

同样,您可以将该配置存储为本地文件并像打开任何其他文件一样打开它。请记住,对此的最佳答案将取决于您的基础设施和用例。