我正在为我的 ETL 编写单元测试,作为一个过程,我想测试所有 Dags 以确保它们没有循环。在阅读 Bas Harenslak 和 Julian de Ruiter 撰写的 Data Pipelines with Apache Airflow 后,我发现他们正在使用 DAG.test_cycle(),这里的 DAG 是从模块 airflow.models.dag 导入的,但是当我运行代码时,我收到一个错误 AttributeError :“DAG”对象没有属性“test_cycle”
\n这是我的代码片段
\nimport glob\nimport importlib\nimport os\n\nimport pytest\nfrom airflow.models.dag import DAG\n\nDAG_PATH = os.path.join(os.path.dirname(file), \xe2\x80\x9c\xe2\x80\xa6\xe2\x80\x9d, \xe2\x80\x9c\xe2\x80\xa6\xe2\x80\x9d, \xe2\x80\x9cdags/**/*.py\xe2\x80\x9d)\nDAG_FILES = glob.glob(DAG_PATH, recursive=True)\n\n@pytest.mark.parametrize("dag_file", DAG_FILES)\ndef test_dag_integrity(dag_file):\n module_name, _ = os.path.splitext(dag_file)\n module_path = os.path.join(DAG_PATH, dag_file)\n mod_spec = importlib.util.spec_from_file_location(module_name, module_path)\n module = importlib.util.module_from_spec(mod_spec)\n mod_spec.loader.exec_module(module)\n\n dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]\n\n assert dag_objects\n\n for dag in dag_objects:\n dag.test_cycle()\nRun Code Online (Sandbox Code Playgroud)\n