我想建立一个这样的气流图:
\n \xe2\x94\x8c\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba task_2_1 \xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba task_2_1\n \xe2\x94\x82\n \xe2\x94\x82\n \xe2\x94\x82\ntask 1 \xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba task_2_2 \xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba task_3_2\n \xe2\x94\x82\n \xe2\x94\x82\n \xe2\x94\x82\n \xe2\x94\x82\n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba task_2_N \xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x96\xba task_3_N\nRun Code Online (Sandbox Code Playgroud)\n其中垂直任务的数量(N)由第一步计算动态控制。
\n一个可能的解决方案是:
\nfrom airflow import DAG\nfrom airflow.decorators import task, task_group\nfrom pendulum import datetime, now\n\n\n@task\ndef task_1():\n return list(range(5))\n\n\n@task\ndef task_2(task_num):\n return task_num\n\n\n@task\ndef task_3(task_num):\n return task_num\n\n\nwith DAG(dag_id="my_dag", start_date=now(), schedule_interval=None) as dag:\n task_3.expand(task_num=task_2.expand(task_num=task_1()))\nRun Code Online (Sandbox Code Playgroud)\n但task_3 实例仅在所有task_2 实例完成后才开始运行。我宁愿通过共享相同task_num 的task_2 和task_3 实例对任务进行分组。\n直观的解决方案是使用任务组,但它们没有扩展方法。
\n任务组上的动态任务映射(也称为深度优先任务执行)在 Airflow 中尚不可用(当前版本为 2.4.2)
实施延迟到未来版本(请参阅https://github.com/apache/airflow/pull/22518)
解决方法包括将任务 2 和任务 3 移动到单独的 dag 中,并通过在主 dag 中扩展 TriggerDagRunOperator 任务来运行它们。
| 归档时间: |
|
| 查看次数: |
3772 次 |
| 最近记录: |