Nic*_*asi 3 pytest directed-acyclic-graphs airflow
我正在为我的 ETL 编写单元测试,作为一个过程,我想测试所有 Dags 以确保它们没有循环。在阅读 Bas Harenslak 和 Julian de Ruiter 撰写的 Data Pipelines with Apache Airflow 后,我发现他们正在使用 DAG.test_cycle(),这里的 DAG 是从模块 airflow.models.dag 导入的,但是当我运行代码时,我收到一个错误 AttributeError :“DAG”对象没有属性“test_cycle”
\n这是我的代码片段
\nimport glob\nimport importlib\nimport os\n\nimport pytest\nfrom airflow.models.dag import DAG\n\nDAG_PATH = os.path.join(os.path.dirname(file), \xe2\x80\x9c\xe2\x80\xa6\xe2\x80\x9d, \xe2\x80\x9c\xe2\x80\xa6\xe2\x80\x9d, \xe2\x80\x9cdags/**/*.py\xe2\x80\x9d)\nDAG_FILES = glob.glob(DAG_PATH, recursive=True)\n\n@pytest.mark.parametrize("dag_file", DAG_FILES)\ndef test_dag_integrity(dag_file):\n module_name, _ = os.path.splitext(dag_file)\n module_path = os.path.join(DAG_PATH, dag_file)\n mod_spec = importlib.util.spec_from_file_location(module_name, module_path)\n module = importlib.util.module_from_spec(mod_spec)\n mod_spec.loader.exec_module(module)\n\n dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]\n\n assert dag_objects\n\n for dag in dag_objects:\n dag.test_cycle()\nRun Code Online (Sandbox Code Playgroud)\n
在 Airflow2.0.0或更高版本中,您可以使用test_cycle()以 a 作为参数的函数dag:
def test_cycle(dag):
"""
Check to see if there are any cycles in the DAG. Returns False if no cycle found,
otherwise raises exception.
"""
Run Code Online (Sandbox Code Playgroud)
像这样导入:
from airflow.utils.dag_cycle_tester import test_cycle
您可以在DagBag类的定义中找到一个示例。
小智 5
如果有人在使用 test_cycle 方法时遇到 @adan11 提到的相同问题,问题就出在我们编译测试套件(即我们的测试工具)时。pytest,将在导入时解析 test_cycle 方法。
例如
# Pytest will collect this function
from airflow.utils.dag_cycle_tester import test_cycle
Run Code Online (Sandbox Code Playgroud)
当我们使用如下所示的别名时,Pytest 将忽略它:
from airflow.utils.dag_cycle_tester import test_cycle as _test_cycle
Run Code Online (Sandbox Code Playgroud)
这将取决于您的测试配置。
| 归档时间: |
|
| 查看次数: |
1584 次 |
| 最近记录: |