如何安排 DAG 并行运行一些任务,然后在任务完成后运行一项任务?

CCl*_*rke 1 airflow google-cloud-composer

我有几个可以同时运行的任务。当他们完成后,我需要运行最后一个任务。我尝试使用任务分组来做到这一点,如下所示:

import airflow
from airflow.utils.task_group import TaskGroup

with airflow.DAG(
        'my_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:

    with TaskGroup(group_id='task_group_1') as tg1:

    
        task1 = MyOperator(
            task_id='task1',
            dag=dag,
        )

        task2 = MyOperator(
            task_id='task2',
            dag=dag,
        )
        
        [task1, task2]    
    
    final_task = MyOtherOperator(
        task_id="final_task",
        dag=dag
    )

    tg1 >> final_task
   
Run Code Online (Sandbox Code Playgroud)

然而,这里发生的情况是 Final_task 在任务组中的每个任务之后运行多次,因此:

任务1 -> 最终任务 任务2 -> 最终任务

我想要的是任务组并行运行,并且当最终任务完成时只运行一次,如下所示:

[任务1,任务2] -> 最终任务

我认为使用任务组可以帮助我完成此要求,但它没有按预期工作。有人可以帮忙吗?谢谢。

编辑:这是 Airflow 文档示例的结果。它导致task3在group.task1和group1.task2之后运行。我需要它在两个分组任务完成后只运行一次。

在此输入图像描述

最后编辑:事实证明我误解了树视图 - 图形视图确认了分组操作,尽管我仍然收到最终任务的一些其他错误。感谢您帮助我了解有关 DAG 的更多信息。

Sim*_*onD 5

尝试从任务组中删除[task1, task2],使其如下所示:

import airflow
from airflow.utils.task_group import TaskGroup

with airflow.DAG(
        'my_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:

    with TaskGroup(group_id='task_group_1') as tg1:
        task1 = MyOperator(
            task_id='task1',
            dag=dag,
        )

        task2 = MyOperator(
            task_id='task2',
            dag=dag,
        )
        
    
    final_task = MyOtherOperator(
        task_id="final_task",
        dag=dag
    )

    tg1 >> final_task
Run Code Online (Sandbox Code Playgroud)

我认为您不需要像您所做的那样从任务组返回任何内容。只需将 TaskGroup 作为依赖项引用即可。

以下是 apache airflow 文档中的示例:

with TaskGroup("group1") as group1:
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")

task3 = EmptyOperator(task_id="task3")

group1 >> task3
Run Code Online (Sandbox Code Playgroud)

此外,您不需要使用任务组来实现此功能。你可以简单地这样做:

import airflow
from airflow.utils.task_group import TaskGroup

with airflow.DAG(
        'my_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:

    task1 = MyOperator(
        task_id='task1',
        dag=dag,
    )

    task2 = MyOperator(
        task_id='task2',
        dag=dag,
    )
        
    final_task = MyOtherOperator(
        task_id="final_task",
        dag=dag
    )

    task1 >> final_task
    task2 >> final_task
Run Code Online (Sandbox Code Playgroud)

  • 我想说你需要在你的问题中添加图像,因为不清楚你所说的重复是什么意思。 (2认同)