在 Airflow 中对自定义运算符进行单元测试时未找到 Dag 运行

7 python python-3.x airflow airflow-scheduler

我编写了一个自定义运算符 (DataCleaningOperator),它根据提供的架构更正 JSON 数据。

\n

以前,当我不需要实例化 TaskInstance 并为操作员提供上下文时,单元测试就可以工作。不过,我最近更新了该运算符以获取上下文(以便它可以使用 xcom_push)。

\n

以下是其中一项测试的示例:

\n
DEFAULT_DATE = datetime.today()\n\nclass TestDataCleaningOperator(unittest.TestCase):    \n    """\n    Class to execute unit tests for the operator 'DataCleaningOperator'.\n    """\n    def setUp(self) -> None:\n        super().setUp()\n        self.dag = DAG(\n            dag_id="test_dag_data_cleaning",\n            schedule_interval=None,\n            default_args={\n                "owner": "airflow",\n                "start_date": DEFAULT_DATE,\n                "output_to_xcom": True,\n            },\n        )\n        self._initialise_test_data()\n\n    def _initialize_test_data() -> None:\n        # Test data set here as class variables such as self.test_data_correct\n        ...\n\n    def test_operator_cleans_dataset_which_matches_schema(self) -> None:\n        """\n        Test: Attempt to clean a dataset which matches the provided schema.\n        Verification: Returns the original dataset, unchanged.\n        """\n        task = DataCleaningOperator(\n            task_id="test_operator_cleans_dataset_which_matches_schema",\n            schema_fields=self.test_schema_nest,\n            data_file_object=deepcopy(self.test_data_correct),\n            dag=self.dag,\n        )\n        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)\n        result: List[dict] = task.execute(ti.get_template_context())\n        self.assertEqual(result, self.test_data_correct)\n
Run Code Online (Sandbox Code Playgroud)\n

但是,当运行测试时,会出现以下错误:

\n
airflow.exceptions.DagRunNotFound: DagRun for 'test_dag_data_cleaning' with date 2022-02-22 12:09:51.538954+00:00 not found\n
Run Code Online (Sandbox Code Playgroud)\n

这与 test_operator_cleans_dataset_which_matches_schema 中实例化任务实例的行有关。

\n

为什么 Airflow 找不到 test_dag_data_cleaning DAG?有我错过的特定配置吗?如果此测试 dag 位于我的标准 DAG 目录之外,我是否还需要创建 DAG 运行实例或手动将 DAG 添加到 dag 包中?我的 dag 目录中的所有正常(非测试)dags 都正确运行。

\n

如果有帮助的话,我当前的 Airflow 版本是 2.2.3,我的项目结构是:

\n
airflow\n\xe2\x94\x9c\xe2\x94\x80 dags\n\xe2\x94\x9c\xe2\x94\x80 plugins\n|  \xe2\x94\x9c\xe2\x94\x80 ...\n|  \xe2\x94\x94\xe2\x94\x80 operators\n|     \xe2\x94\x9c\xe2\x94\x80 ...\n|     \xe2\x94\x94\xe2\x94\x80 data_cleaning_operator.py\n|\n\xe2\x94\x94\xe2\x94\x80 tests\n   \xe2\x94\x9c\xe2\x94\x80 ...\n   \xe2\x94\x94\xe2\x94\x80 operators\n      \xe2\x94\x94\xe2\x94\x80 test_data_cleaning_operator.py\n
Run Code Online (Sandbox Code Playgroud)\n

anz*_*ana 6

编写的代码是使用Airflow 2.0格式的单元测试。因此,当您升级到Airflow 2.2.3 时,单元测试要求您在创建测试运行之前创建 dagrun。

以下是对我有用的示例代码:

import unittest

import pendulum
from airflow import DAG
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType

from operators.test_operator import EvenNumberCheckOperator

DEFAULT_DATE = pendulum.datetime(2022, 3, 4, tz='America/Toronto')
TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"


class TestEvenNumberCheckOperator(unittest.TestCase):

    def setUp(self):
        super().setUp()
        self.dag = DAG('test_dag4', default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE})
        self.even = 10
        self.odd = 11
        EvenNumberCheckOperator(
            task_id=TEST_TASK_ID,
            my_operator_param=self.even,
            dag=self.dag
        )


    def test_even(self):
        """Tests that the EvenNumberCheckOperator returns True for 10."""
        dagrun = self.dag.create_dagrun(state=DagRunState.RUNNING,
                                        execution_date=DEFAULT_DATE,
                                        #data_interval=DEFAULT_DATE,
                                        start_date=DEFAULT_DATE,
                                        run_type=DagRunType.MANUAL)
        ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
        ti.task = self.dag.get_task(task_id=TEST_TASK_ID)
        result = ti.task.execute(ti.get_template_context())
        assert result is True
Run Code Online (Sandbox Code Playgroud)