在 Apache Airflow 中运行超过 32 个并发任务

duf*_*ffn 4 python airflow

我正在运行 Apache Airflow 1.8.1。我想在我的实例上运行超过 32 个并发任务,但无法使任何配置工作。

我正在使用 CeleryExecutor,UI 中的 Airflow 配置显示为 64 parallelismdag_concurrency并且我已经多次重新启动 Airflow 调度程序、Web 服务器和工作程序(我实际上是在 Vagrant 机器上本地测试这个,但也在一个 EC2 实例)。

气流.cfg

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 64

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 64
Run Code Online (Sandbox Code Playgroud)

示例 DAG。我已经concurrency直接在 DAG 中尝试了没有和有参数的情况。

from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    'concurrency_dev',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2018, 1, 1),
    },
    schedule_interval=None,
    catchup=False
)

for i in range(0, 40):
    BashOperator(
        task_id='concurrency_dev_{i}'.format(i=i),
        bash_command='sleep 60',
        dag=dag
    )
Run Code Online (Sandbox Code Playgroud)

无论如何,只有 32 个任务是同时执行的。

在此处输入图片说明

dla*_*lin 5

如果您有 2 个工人,celeryd_concurrency = 16那么您只能执行 32 个任务。如果non_pooled_task_slot_count = 32你也受到限制。当然parallelism,不仅dag_concurrency需要在网络服务器和调度器上设置为 32 以上,而且在工作器上也需要设置为 32 以上。