7 python python-3.x airflow airflow-scheduler
我编写了一个自定义运算符 (DataCleaningOperator),它根据提供的架构更正 JSON 数据。
\n以前,当我不需要实例化 TaskInstance 并为操作员提供上下文时,单元测试就可以工作。不过,我最近更新了该运算符以获取上下文(以便它可以使用 xcom_push)。
\n以下是其中一项测试的示例:
\nDEFAULT_DATE = datetime.today()\n\nclass TestDataCleaningOperator(unittest.TestCase): \n """\n Class to execute unit tests for the operator 'DataCleaningOperator'.\n """\n def setUp(self) -> None:\n super().setUp()\n self.dag = DAG(\n dag_id="test_dag_data_cleaning",\n schedule_interval=None,\n default_args={\n "owner": "airflow",\n "start_date": DEFAULT_DATE,\n "output_to_xcom": True,\n },\n )\n self._initialise_test_data()\n\n def _initialize_test_data() -> None:\n # Test data set here as class variables such as self.test_data_correct\n ...\n\n def test_operator_cleans_dataset_which_matches_schema(self) -> None:\n """\n Test: Attempt to clean a dataset which matches the provided schema.\n Verification: Returns the original dataset, unchanged.\n """\n task = DataCleaningOperator(\n task_id="test_operator_cleans_dataset_which_matches_schema",\n schema_fields=self.test_schema_nest,\n data_file_object=deepcopy(self.test_data_correct),\n dag=self.dag,\n )\n ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)\n result: List[dict] = task.execute(ti.get_template_context())\n self.assertEqual(result, self.test_data_correct)\nRun Code Online (Sandbox Code Playgroud)\n但是,当运行测试时,会出现以下错误:
\nairflow.exceptions.DagRunNotFound: DagRun for 'test_dag_data_cleaning' with date 2022-02-22 12:09:51.538954+00:00 not found\nRun Code Online (Sandbox Code Playgroud)\n这与 test_operator_cleans_dataset_which_matches_schema 中实例化任务实例的行有关。
\n为什么 Airflow 找不到 test_dag_data_cleaning DAG?有我错过的特定配置吗?如果此测试 dag 位于我的标准 DAG 目录之外,我是否还需要创建 DAG 运行实例或手动将 DAG 添加到 dag 包中?我的 dag 目录中的所有正常(非测试)dags 都正确运行。
\n如果有帮助的话,我当前的 Airflow 版本是 2.2.3,我的项目结构是:
\nairflow\n\xe2\x94\x9c\xe2\x94\x80 dags\n\xe2\x94\x9c\xe2\x94\x80 plugins\n| \xe2\x94\x9c\xe2\x94\x80 ...\n| \xe2\x94\x94\xe2\x94\x80 operators\n| \xe2\x94\x9c\xe2\x94\x80 ...\n| \xe2\x94\x94\xe2\x94\x80 data_cleaning_operator.py\n|\n\xe2\x94\x94\xe2\x94\x80 tests\n \xe2\x94\x9c\xe2\x94\x80 ...\n \xe2\x94\x94\xe2\x94\x80 operators\n \xe2\x94\x94\xe2\x94\x80 test_data_cleaning_operator.py\nRun Code Online (Sandbox Code Playgroud)\n
编写的代码是使用Airflow 2.0格式的单元测试。因此,当您升级到Airflow 2.2.3 时,单元测试要求您在创建测试运行之前创建 dagrun。
以下是对我有用的示例代码:
import unittest
import pendulum
from airflow import DAG
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from operators.test_operator import EvenNumberCheckOperator
DEFAULT_DATE = pendulum.datetime(2022, 3, 4, tz='America/Toronto')
TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"
class TestEvenNumberCheckOperator(unittest.TestCase):
def setUp(self):
super().setUp()
self.dag = DAG('test_dag4', default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE})
self.even = 10
self.odd = 11
EvenNumberCheckOperator(
task_id=TEST_TASK_ID,
my_operator_param=self.even,
dag=self.dag
)
def test_even(self):
"""Tests that the EvenNumberCheckOperator returns True for 10."""
dagrun = self.dag.create_dagrun(state=DagRunState.RUNNING,
execution_date=DEFAULT_DATE,
#data_interval=DEFAULT_DATE,
start_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL)
ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = self.dag.get_task(task_id=TEST_TASK_ID)
result = ti.task.execute(ti.get_template_context())
assert result is True
Run Code Online (Sandbox Code Playgroud)