在Apache Airflow中运行并行任务

Mit*_*Mit 8 airflow

我能够配置airflow.cfg文件一个接一个地运行任务.

我想要做的是,并行执行任务,例如一次执行2个任务并到达列表末尾.

我该如何配置?

Tay*_*ton 11

在平行气流执行任务,取决于你使用的执行者,例如SequentialExecutor,LocalExecutor,CeleryExecutor等.

对于简单的设置,只需将执行程序设置为LocalExecutorairflow.cfg 即可实现并行性:

[core]
executor = LocalExecutor
Run Code Online (Sandbox Code Playgroud)

参考:https://github.com/apache/incubator-airflow/blob/29ae02a070132543ac92706d74d9a5dc676053d9/airflow/config_templates/default_airflow.cfg#L76

这将为每个任务启动一个单独的过程.

(当然,您需要有一个DAG,其中至少有两个任务可以并行执行,以确保其正常工作.)

或者,CeleryExecutor您可以通过运行(可以根据需要多次)来启动任意数量的工作人员:

$ airflow worker
Run Code Online (Sandbox Code Playgroud)

任务将进入Celery队列,每个Celery工作程序将从队列中取出.

您可能会在Airflow配置文档中找到使用Celery扩展的部分.

https://airflow.apache.org/howto/executor/use-celery.html

对于任何执行程序,您可能希望在运行后调整控制并行性的核心设置.

他们都被发现了[core].这些是默认值:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16
Run Code Online (Sandbox Code Playgroud)

参考:https://github.com/apache/incubator-airflow/blob/29ae02a070132543ac92706d74d9a5dc676053d9/airflow/config_templates/default_airflow.cfg#L99