如何控制Airflow DAG的并行性或并发性?

hex*_*ide 4 python airflow google-cloud-composer

在我的某些Airflow安装中,即使未完全加载调度程序,调度运行的DAG或任务也不会运行。如何增加可以同时运行的DAG或任务的数量?

同样,如果我的安装负载很高,并且我想限制Airflow工作人员拉出排队任务的速度,我该如何调整?

hex*_*ide 12

这是Airflow v1.10.2中可用的配置选项的扩展列表。可以在每个DAG或每个操作员的基础上进行设置,如果未指定,则可能会退回到设置范围的默认值。


可以基于每个DAG指定的选项:

  • concurrency:设置为允许在DAG的所有活动运行中同时运行的任务实例数。core.dag_concurrency如果未设置,则默认为
  • max_active_runs:此DAG的最大活动运行次数。一旦达到此限制,调度程序将不会创建新的活动DAG运行。core.max_active_runs_per_dag如果未设置,则默认为

例子:

# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)

# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)
Run Code Online (Sandbox Code Playgroud)

可以在每个操作员的基础上指定的选项:

  • pool:用于在其中执行任务的只能用于限制部分任务的并行性
  • task_concurrency:每个任务级别的并发限制

例:

t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)
Run Code Online (Sandbox Code Playgroud)

在整个Airflow设置中指定的选项:

  • core.parallelism:整个Airflow安装中运行的最大任务数
  • core.dag_concurrency: max number of tasks that can be running per DAG (across multiple DAG runs)
  • core.non_pooled_task_slot_count: number of task slots allocated to tasks not running in a pool
  • core.max_active_runs_per_dag: maximum number of active DAG runs, per DAG
  • scheduler.max_threads: how many threads the scheduler process should use to use to schedule DAGs
  • celery.worker_concurrency: number of task instances that a worker will take if using CeleryExecutor
  • celery.sync_parallelism: number of processes CeleryExecutor should use to sync task state

  • 好答案,非常感谢!对所有与并发性相关的选项的清晰说明。 (2认同)