Apache Airflow:在单个 DAG 运行中运行所有并行任务

Edu*_*rev 3 parallel-processing scheduled-tasks task airflow

我有一个 DAG,它有 30 个(或更多)动态创建的并行任务。

concurrency在该 DAG 上设置了选项,以便在追赶历史记录时只运行单个 DAG Run。当我在我的服务器上运行它时,实际上只有 16 个任务并行运行,而其余 14 个只是等待排队。

我应该更改哪个设置,以便我只运行 1 个 DAG Run,但同时运行所有 30 多个任务?

根据这个 FAQ,它似乎是dag_concurrencyor 之一max_active_runs_per_dag,但前者似乎concurrency已经被设置过度驱动,而后者似乎没有效果(或者我实际上弄乱了我的设置)。这是示例代码:

import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'concurrency': 1,
    'retries': 0,
}


def print_operators(ds, **kwargs):
    logging.info(f"Task {kwargs.get('task_instance_key_str', 'unknown_task_instance')}")


dag = DAG(
    dag_id='test_parallelism_dag',
    start_date=dt.datetime(2019, 1, 1),
    default_args=default_args,
    schedule_interval='@daily',
    catchup=True,
    template_searchpath=[config.DAGS_PATH],
    params={'schema': config.SCHEMA_DB},
    max_active_runs=1,
)

print_operators = [PythonOperator(
    task_id=f'test_parallelism_dag.print_operator_{i}',
    python_callable=print_operators,
    provide_context=True,
    dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
    task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
    task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end

Run Code Online (Sandbox Code Playgroud)

编辑 1:我的当前airflow.cfg包含:

executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26
Run Code Online (Sandbox Code Playgroud)

我的 env 变量如下(将所有变量设置为不同以轻松发现哪一个有帮助):

AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22
Run Code Online (Sandbox Code Playgroud)

有了这个,我有以下甘特图: 在此处输入图片说明

哪种类型给了我一个提示,即设置 DAG_CONCURRENCY 环境变量有效。

Edu*_*rev 5

要更改的实际参数dag_concurrency在airflow.cfg 中或使用AIRFLOW__CORE__DAG_CONCURRENCYenv 变量覆盖它。

根据我在问题中提到的文档

concurrency$concurrency 在任何给定时间,Airflow 调度程序将只为您的 DAG运行任务实例。并发性在您的 Airflow DAG 中定义。如果您没有在 DAG 上设置并发,调度程序将使用dag_concurrency 您的 airflow.cfg 条目中的默认值。

这意味着以下简化代码:

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'concurrency': 1,
}


dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
)
Run Code Online (Sandbox Code Playgroud)

应该改写为:

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
}


dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
    concurrency=30
)
Run Code Online (Sandbox Code Playgroud)

我的代码实际上有错误的假设,即default_args在某些时候将实际的 kwargs 替换为 DAG 构造函数。我不知道当时是什么让我得出这个结论,但我想设置concurrency1有一些剩余的草稿,它实际上从未影响任何东西,实际的 DAG 并发性是从配置默认值设置的,即 16。