气流平行度

sid*_*607 34 airflow

本地执行程序在计划任务时生成新进程.它创建的进程数量是否有限制.我需要改变它.我需要知道airflow.cfg中调度程序的"max_threads"和"parallelism"之间有什么区别?

Rog*_*ger 64

并行性:不是一个非常具有描述性的名称.描述说它设置了气流安装的最大任务实例,这有点模棱两可 - 如果我有两个运行气流工作的主机,我将在两台主机上安装气流,所以这应该是两个安装,但基于上下文"每次安装"在这里表示"每个Airflow状态数据库".我将此命名为max_active_tasks.

dag_concurrency:尽管基于注释的名称,这实际上是任务并发性,而且是每个工作者.我将此命名为max_active_tasks_for_worker(per_worker会建议它是工作人员的全局设置,但我认为您可以为此设置不同值的工作人员).

max_active_runs_per_dag:这个有点好,但是因为它似乎只是匹配DAG kwarg的默认值,所以在名称中反映出来可能会很好,比如default_max_active_runs_for_dags所以让我们转到DAG kwargs:

并发性:同样,拥有这样的通用名称,再加上并发用于其他地方的事实使得这非常令人困惑.我叫这个max_active_tasks.

max_active_runs:这个听起来不错.

来源:https://issues.apache.org/jira/browse/AIRFLOW-57


max_threads为用户提供了对cpu使用的一些控制.它指定调度程序并行性.

  • 有没有一种方法可以为每个任务指定并行性?我发现当回填诸如从SFTP下载数据之类的内容时,我希望并行度为4或5。但是,当我加载数据时,我希望其仅为1(如果大于1,则为订单数据)。不能保证没有加载。现在我有SERIAL键,这些键混乱了,因为我忘记了将并行度恢复为1,这有点烦人) (4认同)
  • 来自airlow [文档](https://airflow.apache.org/faq.html#how-can-my-airflow-dag-run-faster):**并发**:Airflow调度程序的运行次数不超过*在任何给定时间为您的DAG提供* $ concurrency **任务实例。并发在您的Airflow DAG中定义。如果未在DAG上设置并发,则调度程序将使用airflow.cfg中** dag_concurrency **条目中的默认值。我了解dag_concurrency =默认并发(将在不设置并发时使用) (2认同)

gun*_*nit 17

现在是 2019 年,更多更新的文档已经出来。简而言之:

AIRFLOW__CORE__PARALLELISM 是可以跨所有 Airflow 并发运行的最大任务实例数(跨所有 dag 的所有任务)

AIRFLOW__CORE__DAG_CONCURRENCY 是单个特定 DAG 允许并发运行的最大任务实例数

这些文档更详细地描述了它:

根据https://www.astronomer.io/guides/airflow-scaling-workers/

并行度是可以在气流上同时运行的任务实例的最大数量。这意味着在所有正在运行的 DAG 中,一次运行的任务不超过 32 个。

dag_concurrency 是允许在特定 dag 内并发运行的任务实例数。换句话说,你可以有 2 个 DAG,每个 DAG 并行运行 16 个任务,但一个有 50 个任务的 DAG 也只能运行 16 个任务 - 而不是 32

而且,根据https://airflow.apache.org/faq.html#how-to-reduce-airflow-dag-scheduling-latency-in-production

max_threads:调度程序将并行生成多个线程来调度 dag。这由 max_threads 控制,默认值为 2。在生产中,用户应该将此值增加到更大的值(例如调度程序运行的 CPU 数量 - 1)。

但似乎这最后一块不应该占用太多时间,因为它只是“调度”部分。不是实际的运行部分。因此,我们没有看到需要调整max_threads很多,但AIRFLOW__CORE__PARALLELISMAIRFLOW__CORE__DAG_CONCURRENCY没有影响到我们。


Vin*_*oel 13

调度max_threads程序是并行调度程序的进程数.该max_threads不能超过CPU计数.LocalExecutor parallelism是LocalExecutor应该运行的并发任务数.调度程序和LocalExecutor都使用python的多处理库进行并行处理.