mad*_*ad_ 7 python unit-testing pytest airflow
我试图在测试环境中测试一个具有多个任务的dag.我能够测试与dag相关的单个任务,但我想在第一个任务的dag和kick中创建几个任务.为了测试我正在使用的dag中的一个任务
task1.run()
哪个正在执行.但是,当我在dag下游一个接一个地完成许多任务时,同样不起作用.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
t1.run() # It is executing just first task.
Run Code Online (Sandbox Code Playgroud)
为了运行第二个任务,我必须使用t2.run()运行,这是我不想要的,因为我正在设计DAG.怎么做到这一点?
我还不完全确定我理解你的问题,但我会开始回答.
如果您的目标是手动运行DAG或其任务的子集,则可以通过CLI实现此目的,例如:
$ airflow run ... - 运行任务实例$ airflow test ... - 测试任务实例而不检查数据库中的依赖关系或记录状态$ airflow trigger_dag ... - 触发DAG的特定DAG运行CLI文档 - https://airflow.apache.org/cli.html
我认为气流运行命令是与您的用例最相关的命令.
在运行时,一旦满足其要求,在DAG中调度任务并运行下游依赖项都由执行程序自动处理.您不需要在代码中的任何位置调用run().
就run方法本身而言,代码仍然存在:
问题