AIrflow - 跨多个文件拆分DAG定义

R.M*_*.M. 12 airflow

刚刚开始使用Airflow,并想知道构建大型DAG的最佳实践是什么.对于我们的ETL,我们有许多属于逻辑分组的任务,但这些组依赖于彼此.以下哪项被认为是最佳做法?

  • 一个大型DAG文件,其中包含该文件中的所有任务
  • 将DAG定义拆分为多个文件(如何执行此操作?)
  • 定义多个DAG,每个任务组一个,并使用ExternalTask​​Sensor设置它们之间的依赖关系

也欢迎其他建议.

Vin*_*oel 11

DAG只是python文件.因此,您可以将单个dag定义拆分为多个文件.不同的文件应该只有接受dag对象的方法,并使用该dag对象创建任务.

但请注意,您应该只在全局范围内使用一个dag对象.Airflow将全局范围内的所有dag对象作为单独的dag拾取.

通常认为保持每个dag尽可能简洁是一种好习惯.但是,如果您需要设置此类依赖项,则可以考虑使用子标记.有关这方面的更多信息,请访问:https://airflow.incubator.apache.org/concepts.html?highlight = subdag #scope

您也可以使用ExternalTask​​Sensor,但要注意随着dag数量的增加,处理任务之间的外部依赖关系可能会变得更加困难.我认为子标签可能是你用例的方法.


L. *_*May 7

随着 Airflow 2.x 中任务组的出现,值得对之前的答案进行扩展。任务组只是任务的 UI 分组,但它们也可以作为一堆相关任务的方便的逻辑分组。TaskGroup 中的任务可以进行捆绑和抽象,以便更轻松地用较大的部分构建 DAG。话虽这么说,拥有一个充满相关任务的文件而不将它们捆绑到任务组中可能仍然有用。

例如,分解 DAG 的技巧是将 DAG 放在一个文件中,my_dag.py将任务或任务组的逻辑块放在单独的文件中,每个文件一个逻辑任务块或任务组。每个文件都包含函数(或方法,如果您想采用 OO 方法),每个函数都会返回一个运算符实例或一个 TaskGroup 实例。

为了说明这一点,my_dag.py(如下)从 导入运算符返回函数foo_bar_tasks.py,并从 导入任务组返回函数xyzzy_taskgroup.py。在 DAG 上下文中,调用这些函数并将它们的返回值分配给任务或任务组变量,这些变量可以分配上游/下游依赖项。

dags/my_dag.py

# std lib imports
 
from airflow import DAG
# other airflow imports
 
from includes.foo_bar_tasks import build_foo_task, build_bar_task
from includes.xyzzy_taskgroup import build_xyzzy_taskgroup
 
with DAG(dag_id="my_dag", ...) as dag:
 
    # logical chunk of tasks
    foo_task = build_foo_task(dag=dag, ...)
    bar_task = build_bar_task(dag=dag, ...)
 
    # taskgroup
    xyzzy_taskgroup = build_xyzzy_taskgroup(dag=dag, ...)
 
    foo_task >> bar_task >> xyzzy_taskgroup
Run Code Online (Sandbox Code Playgroud)

plugins/includes/foo_bar_tasks.py

# std lib imports
 
from airflow import DAG
from airflow.operators.foo import FooOperator
from airflow.operators.bar import BarOperator
# other airflow imports
 
def build_foo_task(dag: DAG, ...) -> FooOperator:
    # ... logic here ...
    foo_task = FooOperator(..., dag=dag)
 
    return foo_task
 
def build_bar_task(dag: DAG, ...) -> BarOperator:
    # ... logic here ...
    bar_task = BarOperator(..., dag=dag)
 
    return bar_task
Run Code Online (Sandbox Code Playgroud)

plugins/includes/xyzzy_taskgroup.py

# std lib imports
 
from airflow import DAG
from airflow.operators.baz import BazOperator
from airflow.operators.qux import QuxOperator
from airflow.utils import TaskGroup
# other airflow imports
 
def build_xyzzy_taskgroup(dag: DAG, ...) -> TaskGroup:
    xyzzy_taskgroup = TaskGroup(group_id="xyzzy_taskgroup")
 
    # ... logic here ...
    baz_task = BazOperator(task_id="baz_task", task_group=xyzzy_taskgroup, ...)
 
    # ... logic here ...
    qux_task = QuxOperator(task_id="qux_task", task_group=xyzzy_taskgroup, ...)
 
    baz_task >> qux_task
 
    return xyzzy_taskgroup
Run Code Online (Sandbox Code Playgroud)