我们如何验证 DAG 对象中是否不存在循环

Nic*_*asi 3 pytest directed-acyclic-graphs airflow

我正在为我的 ETL 编写单元测试,作为一个过程,我想测试所有 Dags 以确保它们没有循环。在阅读 Bas Harenslak 和 Julian de Ruiter 撰写的 Data Pipelines with Apache Airflow 后,我发现他们正在使用 DAG.test_cycle(),这里的 DAG 是从模块 airflow.models.dag 导入的,但是当我运行代码时,我收到一个错误 AttributeError :“DAG”对象没有属性“test_cycle”

\n

这是我的代码片段

\n
import glob\nimport importlib\nimport os\n\nimport pytest\nfrom airflow.models.dag import DAG\n\nDAG_PATH = os.path.join(os.path.dirname(file), \xe2\x80\x9c\xe2\x80\xa6\xe2\x80\x9d, \xe2\x80\x9c\xe2\x80\xa6\xe2\x80\x9d, \xe2\x80\x9cdags/**/*.py\xe2\x80\x9d)\nDAG_FILES = glob.glob(DAG_PATH, recursive=True)\n\n@pytest.mark.parametrize("dag_file", DAG_FILES)\ndef test_dag_integrity(dag_file):\n    module_name, _ = os.path.splitext(dag_file)\n    module_path = os.path.join(DAG_PATH, dag_file)\n    mod_spec = importlib.util.spec_from_file_location(module_name, module_path)\n    module = importlib.util.module_from_spec(mod_spec)\n    mod_spec.loader.exec_module(module)\n\n    dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]\n\n    assert dag_objects\n\n    for dag in dag_objects:\n        dag.test_cycle()\n
Run Code Online (Sandbox Code Playgroud)\n

Nic*_*coE 7

在 Airflow2.0.0或更高版本中,您可以使用test_cycle()以 a 作为参数的函数dag

def test_cycle(dag):
    """
    Check to see if there are any cycles in the DAG. Returns False if no cycle found,
    otherwise raises exception.
    """
Run Code Online (Sandbox Code Playgroud)

来源

像这样导入:

from airflow.utils.dag_cycle_tester import test_cycle

您可以在DagBag类的定义中找到一个示例。

  • 由于某种原因,我在运行此程序时收到以下错误。关于如何解决的任何想法?def test_cycle(dag):E固定装置“dag”未找到>可用固定装置:anyio_backend,anyio_backend_name,anyio_backend_options,cache,capfd,capfdbinary,caplog,capsys,capsysbinary,celery_app,celery_class_tasks,c (3认同)
  • @adan11你的问题是,当你在 pytest 套件中的任何名称以“test”开头的函数中放置一个参数时,pytest 认为这是一个测试,并假设传递的参数是固定装置。如果您有较新版本的 Airflow,此函数已重命名为“check_cycle”。如果不这样做,只需在导入时为函数添加别名,就像 Makr 在本页的答案中所做的那样。 (2认同)

小智 5

如果有人在使用 test_cycle 方法时遇到 @adan11 提到的相同问题,问题就出在我们编译测试套件(即我们的测试工具)时。pytest,将在导入时解析 test_cycle 方法。

例如

# Pytest will collect this function
from airflow.utils.dag_cycle_tester import test_cycle
Run Code Online (Sandbox Code Playgroud)

当我们使用如下所示的别名时,Pytest 将忽略它:

from airflow.utils.dag_cycle_tester import test_cycle as _test_cycle
Run Code Online (Sandbox Code Playgroud)

这将取决于您的测试配置。