CCl*_*rke 1 airflow google-cloud-composer
我有几个可以同时运行的任务。当他们完成后,我需要运行最后一个任务。我尝试使用任务分组来做到这一点,如下所示:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
with TaskGroup(group_id='task_group_1') as tg1:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
[task1, task2]
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
tg1 >> final_task
Run Code Online (Sandbox Code Playgroud)
然而,这里发生的情况是 Final_task 在任务组中的每个任务之后运行多次,因此:
任务1 -> 最终任务 任务2 -> 最终任务
我想要的是任务组并行运行,并且当最终任务完成时只运行一次,如下所示:
[任务1,任务2] -> 最终任务
我认为使用任务组可以帮助我完成此要求,但它没有按预期工作。有人可以帮忙吗?谢谢。
编辑:这是 Airflow 文档示例的结果。它导致task3在group.task1和group1.task2之后运行。我需要它在两个分组任务完成后只运行一次。
最后编辑:事实证明我误解了树视图 - 图形视图确认了分组操作,尽管我仍然收到最终任务的一些其他错误。感谢您帮助我了解有关 DAG 的更多信息。
尝试从任务组中删除[task1, task2],使其如下所示:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
with TaskGroup(group_id='task_group_1') as tg1:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
tg1 >> final_task
Run Code Online (Sandbox Code Playgroud)
我认为您不需要像您所做的那样从任务组返回任何内容。只需将 TaskGroup 作为依赖项引用即可。
with TaskGroup("group1") as group1:
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
group1 >> task3
Run Code Online (Sandbox Code Playgroud)
此外,您不需要使用任务组来实现此功能。你可以简单地这样做:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
task1 >> final_task
task2 >> final_task
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3308 次 |
| 最近记录: |