srv*_*_ER 1 python python-3.x airflow airflow-scheduler
我正在运行基于列表的任务。任务 ID 根据列表递增。完成这些任务后我想要执行另一个任务。以下是代码:
with DAG('test',) as dag:
t1 = [PythonOperator(
task_id=f"task_hour_{hours}",
python_callable=hourly_job,
op_kwargs={
"hour": hours
}
) for hours in ['01', '02', '03']
]
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
t1 >> t2
Run Code Online (Sandbox Code Playgroud)
正在发生的情况是,这些hourly任务全部并行运行,然后daily为每个任务执行任务。像这样:
task_hour_01 >> daily
task_hour_02 >> daily
task_hour_03 >> daily
Run Code Online (Sandbox Code Playgroud)
我想要发生的是这些hourly任务应该按顺序执行,最后daily任务应该执行:
task_hour_01 >> task_hour_02 >> task_hour_03 >> daily
Run Code Online (Sandbox Code Playgroud)
所以有两个问题:
您的代码方向正确,您只是缺少设置依赖项chain:
from datetime import datetime
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
# Replace with your function logic
def hourly_job():
return 'hourly'
# Replace with your function logic
def daily_job():
return 'daily'
with DAG(dag_id='test', start_date=datetime(2022, 2, 16)) as dag:
hours = ['01', '02', '03']
op_list = [
PythonOperator(task_id=f"task_hour_{hour}", python_callable=hourly_job, op_kwargs={"hour": hour})
for hour in hours]
chain(*op_list)
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
op_list[-1] >> t2
Run Code Online (Sandbox Code Playgroud)