Airflow:并行链接任务

Art*_*ano 2 python parallel-processing airflow

我目前正在开发一个 DAG,需要每月循环处理一长串任务。

为了实现这一点,我创建一个空列表,然后循环多个任务,根据新的月份更改它们的 task_id。

例子:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain

dag = DAG(
    "import_trx_table",
    default_args=default_args,
    schedule_interval="45 9 * * *",
)

task_list = []

for month_ds in month_lst_ds:
    start = DummyOperator(task_id=f"dummy_start_trx_table_imports_{month_ds}", dag=dag)

    task_list.append(start)

...

chain(*task_list)
Run Code Online (Sandbox Code Playgroud)

问题是 DAG 现在非常长(而且很慢)(因为我迭代了超过 12 个月)。其中一些任务肯定可以并行运行。

我试图检查链辅助函数是否有并行链接的方式,但找不到任何东西。

有什么建议/想法吗?

谢谢。

Ela*_*lad 7

您可以使用chain,但它在这里并没有真正提供价值。

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 6, 7),
}

dag = DAG(
    "stackoverflow_question",
    default_args=default_args,
    schedule_interval="@daily",
)

month_lst_ds = ['Dec', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov']

start_op = DummyOperator(task_id='start_task', dag=dag)
end_op = DummyOperator(task_id='end_task', dag=dag)
for month_ds in month_lst_ds:
    month_op = DummyOperator(task_id=f"dummy_start_trx_table_imports_{month_ds}", dag=dag)
    start_op >> month_op >> end_op 
Run Code Online (Sandbox Code Playgroud)

如果您愿意,可以更换

start_op >> month_op >> end_op 
Run Code Online (Sandbox Code Playgroud)

chain(start_op, month_op, end_op )
Run Code Online (Sandbox Code Playgroud)

这是 DAG 结构在图形视图中的外观:

在此输入图像描述

  • @ArturoBelano 我偶尔会遇到像这样的大规模并行结构的遗漏问题,但假设这些问题与 Airflow 版本或 Airflow 并行处理的限制有关。埃拉德是对的,调试结构应该是一个单独的问题,除非他显示的结构与您的预期有很大不同。 (2认同)