如何在unittest中测试气流dag?

mad*_*ad_ 7 python unit-testing pytest airflow

我试图在测试环境中测试一个具有多个任务的dag.我能够测试与dag相关的单个任务,但我想在第一个任务的dag和kick中创建几个任务.为了测试我正在使用的dag中的一个任务

task1.run()

哪个正在执行.但是,当我在dag下游一个接一个地完成许多任务时,同样不起作用.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

t2.set_upstream(t1)

t1.run() # It is executing just first task.
Run Code Online (Sandbox Code Playgroud)

为了运行第二个任务,我必须使用t2.run()运行,这是我不想要的,因为我正在设计DAG.怎么做到这一点?

Tay*_*ton 8

我还不完全确定我理解你的问题,但我会开始回答.

如果您的目标是手动运行DAG或其任务的子集,则可以通过CLI实现此目的,例如:

  • $ airflow run ... - 运行任务实例
  • $ airflow test ... - 测试任务实例而不检查数据库中的依赖关系或记录状态
  • $ airflow trigger_dag ... - 触发DAG的特定DAG运行

CLI文档 - https://airflow.apache.org/cli.html

我认为气流运行命令是与您的用例最相关的命令.

在运行时,一旦满足其要求,在DAG中调度任务并运行下游依赖项都由执行程序自动处理.您不需要在代码中的任何位置调用run().

就run方法本身而言,代码仍然存在:

问题

  1. 当你说在测试环境中测试DAG时,你的意思是什么?就像CI或单元测试一样?
  2. 这个代码是来自您的一个实际DAG的测试或代码吗?
  3. 这是否与您最近的其他问题有关?测试Dag在单元测试中运行Airflow 1.9

  • 感谢泰勒抽出时间并提供答案。我必须在单元测试中测试具有依赖项的 dag。因此,我不希望存储在文件中/让 dag 显示在 GUI 上,因为它是测试 dag。我想在单元测试中加载整个 dag 并从那里触发。它曾经在 1.7 中工作,但不太确定我缺少什么才能使其在 1.9 中工作。是的,它与我的其他问题相关,因为它与不同版本相关,因此将两个问题分开以避免混淆。如上例所示运行 dag 的问题是它不尊重依赖关系 (3认同)