小编Nic*_*asi的帖子

我们如何验证 DAG 对象中是否不存在循环

我正在为我的 ETL 编写单元测试,作为一个过程,我想测试所有 Dags 以确保它们没有循环。在阅读 Bas Harenslak 和 Julian de Ruiter 撰写的 Data Pipelines with Apache Airflow 后,我发现他们正在使用 DAG.test_cycle(),这里的 DAG 是从模块 airflow.models.dag 导入的,但是当我运行代码时,我收到一个错误 AttributeError :“DAG”对象没有属性“test_cycle”

\n

这是我的代码片段

\n
import glob\nimport importlib\nimport os\n\nimport pytest\nfrom airflow.models.dag import DAG\n\nDAG_PATH = os.path.join(os.path.dirname(file), \xe2\x80\x9c\xe2\x80\xa6\xe2\x80\x9d, \xe2\x80\x9c\xe2\x80\xa6\xe2\x80\x9d, \xe2\x80\x9cdags/**/*.py\xe2\x80\x9d)\nDAG_FILES = glob.glob(DAG_PATH, recursive=True)\n\n@pytest.mark.parametrize("dag_file", DAG_FILES)\ndef test_dag_integrity(dag_file):\n    module_name, _ = os.path.splitext(dag_file)\n    module_path = os.path.join(DAG_PATH, dag_file)\n    mod_spec = importlib.util.spec_from_file_location(module_name, module_path)\n    module = importlib.util.module_from_spec(mod_spec)\n    mod_spec.loader.exec_module(module)\n\n    dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]\n\n    assert dag_objects\n\n    for dag in dag_objects:\n        dag.test_cycle()\n
Run Code Online (Sandbox Code Playgroud)\n

pytest directed-acyclic-graphs airflow

3
推荐指数
2
解决办法
1584
查看次数

标签 统计

airflow ×1

directed-acyclic-graphs ×1

pytest ×1