我正在运行基于列表的任务。任务 ID 根据列表递增。完成这些任务后我想要执行另一个任务。以下是代码:
with DAG('test',) as dag:
t1 = [PythonOperator(
task_id=f"task_hour_{hours}",
python_callable=hourly_job,
op_kwargs={
"hour": hours
}
) for hours in ['01', '02', '03']
]
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
t1 >> t2
Run Code Online (Sandbox Code Playgroud)
正在发生的情况是,这些hourly任务全部并行运行,然后daily为每个任务执行任务。像这样:
task_hour_01 >> daily
task_hour_02 >> daily
task_hour_03 >> daily
Run Code Online (Sandbox Code Playgroud)
我想要发生的是这些hourly任务应该按顺序执行,最后daily任务应该执行:
task_hour_01 >> task_hour_02 >> task_hour_03 >> daily
Run Code Online (Sandbox Code Playgroud)
所以有两个问题: