Airflow:使用TaskFlow API动态生成任务

EdG*_*EdG 1 airflow

之前我使用以下代码片段来动态生成任务:

dummy_start_task = PythonOperator(
    task_id="dummy_start",
    default_args=default_args,
    python_callable=dummy_start,
    dag=dag
)

make_images_tasks = list()
for n in range(WORKERS):
    globals()[f"make_images_{n}_task"] = PythonOperator(
        task_id=f'make_images_{n}',
        default_args=default_args,
        python_callable=make_images,
        op_kwargs={"n": n},
        dag=dag
    )
    make_images_tasks.append(globals()[f"make_images_{n}_task"])

dummy_collector_task = PythonOperator(
    task_id="dummy_collector",
    default_args=default_args,
    python_callable=dummy_collector,
    dag=dag
)

dummy_start_task >> make_images_tasks >> dummy_collector_task

# in collector_task I would use:
# items = task_instance.xcom_pull(task_ids=[f"make_images_{n}" for n in range(int(WORKERS))])
# to get the XCOMs from the these dynamically generated tasks
Run Code Online (Sandbox Code Playgroud)

如何使用 TaskFlow API 实现这一目标?(生成多个任务,然后在以下收集器任务中获取它们的 XCom)

Bas*_*lak 6

这是一个例子:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task

with DAG(dag_id="example_taskflow", start_date=datetime(2022, 1, 1), schedule_interval=None) as dag:

    @task
    def dummy_start_task():
        pass

    tasks = []
    for n in range(3):

        @task(task_id=f"make_images_{n}")
        def images_task(i):
            return i

        tasks.append(images_task(n))

    @task
    def dummy_collector_task(tasks):
        print(tasks)

    dummy_start_task_ = dummy_start_task()
    dummy_start_task_ >> tasks
    dummy_collector_task(tasks)
Run Code Online (Sandbox Code Playgroud)

给出以下 DAG: 在此输入图像描述

这些make_images_*任务将 0、1 和 2 作为输入(并且也在任务的 id 中使用它)并返回值。获取任务dummy_collector_task的所有输出make_images_*并打印[0, 1, 2]