Tsu*_*ume 5 configuration airflow
在 Airflow 中,我们创建了几个 DAGS。其中一些共享公共属性,例如从中读取文件的目录。目前,这些属性被列为每个单独的 DAG 中的一个属性,这在未来显然会成为问题。假设目录名称要更改,我们必须进入每个 DAG 并更新这段代码(甚至可能丢失一个)。
我正在研究创建某种配置文件,该文件可以解析为 Airflow 并在需要某个属性时由各种 DAGS 使用,但我似乎找不到任何有关如何执行此操作的文档或指南。我能找到的大部分是关于设置连接 ID 的文档,但这不符合我的用例。
我的帖子的问题是,是否可以执行上述场景以及如何执行?
提前致谢。
根据您的设置,您可以通过几种方法来完成此操作:
您可以使用 DagFactory 类型的方法,让函数生成 DAG。您可以在这里找到一个示例
您可以将 JSON 配置存储为Airflow Variable,并对其进行解析以生成 DAG。您可以在管理 -> 变量中存储类似的内容:
[
{
"table": "users",
"schema": "app_one",
"s3_bucket": "etl_bucket",
"s3_key": "app_one_users",
"redshift_conn_id": "postgres_default"
},
{
"table": "users",
"schema": "app_two",
"s3_bucket": "etl_bucket",
"s3_key": "app_two_users",
"redshift_conn_id": "postgres_default"
}
]
Run Code Online (Sandbox Code Playgroud)
您的 DAG 可能会生成为:
sync_config = json.loads(Variable.get("sync_config"))
with dag:
start = DummyOperator(task_id='begin_dag')
for table in sync_config:
d1 = RedshiftToS3Transfer(
task_id='{0}'.format(table['s3_key']),
table=table['table'],
schema=table['schema'],
s3_bucket=table['s3_bucket'],
s3_key=table['s3_key'],
redshift_conn_id=table['redshift_conn_id']
)
start >> d1
Run Code Online (Sandbox Code Playgroud)
同样,您可以将该配置存储为本地文件并像打开任何其他文件一样打开它。请记住,对此的最佳答案将取决于您的基础设施和用例。
| 归档时间: |
|
| 查看次数: |
5137 次 |
| 最近记录: |