具有多个链式扩展调用的气流动态任务映射

bru*_*uno 8 dynamic airflow

我想建立一个这样的气流图:

\n
              \xe2\x94\x8c\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba  task_2_1  \xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba  task_2_1\n              \xe2\x94\x82\n              \xe2\x94\x82\n              \xe2\x94\x82\ntask 1 \xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba  task_2_2  \xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba  task_3_2\n              \xe2\x94\x82\n              \xe2\x94\x82\n              \xe2\x94\x82\n              \xe2\x94\x82\n              \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba   task_2_N  \xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba  task_3_N\n
Run Code Online (Sandbox Code Playgroud)\n

其中垂直任务的数量(N)由第一步计算动态控制。

\n

一个可能的解决方案是:

\n
from airflow import DAG\nfrom airflow.decorators import task, task_group\nfrom pendulum import datetime, now\n\n\n@task\ndef task_1():\n    return list(range(5))\n\n\n@task\ndef task_2(task_num):\n    return task_num\n\n\n@task\ndef task_3(task_num):\n    return task_num\n\n\nwith DAG(dag_id="my_dag", start_date=now(), schedule_interval=None) as dag:\n    task_3.expand(task_num=task_2.expand(task_num=task_1()))\n
Run Code Online (Sandbox Code Playgroud)\n

但task_3 实例仅在所有task_2 实例完成后才开始运行。我宁愿通过共享相同task_num 的task_2 和task_3 实例对任务进行分组。\n直观的解决方案是使用任务组,但它们没有扩展方法。

\n

Nab*_*ssi 4

任务组上的动态任务映射(也称为深度优先任务执行)在 Airflow 中尚不可用(当前版本为 2.4.2)

实施延迟到未来版本(请参阅https://github.com/apache/airflow/pull/22518

解决方法包括将任务 2 和任务 3 移动到单独的 dag 中,并通过在主 dag 中扩展 TriggerDagRunOperator 任务来运行它们。