Jak*_*ger 5 airflow google-cloud-composer
Airflow封装的 DAG似乎是实现合理的生产气流部署的重要组成部分。
我有一个带有动态 subDAG 的 DAG,由配置文件驱动,例如:
配置文件:
imports:
- project_foo
- project_bar`
Run Code Online (Sandbox Code Playgroud)
这会产生 subdag 任务,例如imports.project_{foo|bar}.step{1|2|3}.
我通常使用 python 的open函数读取配置文件,一个 laconfig = open(os.path.join(os.path.split(__file__)[0], 'config.yaml')
不幸的是,当使用打包的 DAG 时,这会导致错误:
Broken DAG: [/home/airflow/dags/workflows.zip] [Errno 20] Not a directory: '/home/airflow/dags/workflows.zip/config.yaml'
Run Code Online (Sandbox Code Playgroud)
有什么想法/最佳实践可以在这里推荐吗?
这有点混乱,但我最终只是通过 .zip 文件内容读取ZipFile。
import yaml
from zipfile import ZipFile
import logging
import re
def get_config(yaml_filename):
"""Parses and returns the given YAML config file.
For packaged DAGs, gracefully handles unzipping.
"""
zip, post_zip = re.search(r'(.*\.zip)?(.*)', yaml_filename).groups()
if zip:
contents = ZipFile(zip).read(post_zip.lstrip('/'))
else:
contents = open(post_zip).read()
result = yaml.safe_load(contents)
logging.info('Parsed config: %s', result)
return result
Run Code Online (Sandbox Code Playgroud)
它的工作方式正如您对 main 的期望dag.py:
get_config(os.path.join(path.split(__file__)[0], 'config.yaml'))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2261 次 |
| 最近记录: |