Airflow Scheduler 内存不足问题

FER*_*UZA 4 airflow airflow-scheduler

我们正在试验 Apache Airflow(版本 1.10rc2,使用 python 2.7)并将其部署到 kubernetes、webserver 和调度程序到不同的 pods,并且数据库也在使用 cloud sql,但是我们一直面临调度程序内存不足的问题荚。

在 OOM 的那一刻,我们只运行了 4 个示例 Dag(大约 20 个任务)。Pod 的内存为 1Gib。我在其他帖子中看到,一个任务在运行时可能会消耗大约 50Mib 的内存,并且所有任务操作都在内存中,没有任何内容刷新到磁盘,因此已经提供了 1Gb。

是否有任何经验法则可以用来计算基于并行任务的调度程序需要多少内存?

除了降低并行度之外,是否有任何调整可以减少调度程序本身的内存使用?

我认为我们的用例不需要 Dask 或 Celery 为工人使用更多机器水平扩展 Airflow。

关于配置的更多细节:

executor = Localexecutor
parallelism = 10
dag_concurrency = 5
max_active_runs_per_dag = 2
workers = 1
worker_concurrency = 16
min_file_process_interval = 1
min_file_parsing_loop_time = 5
dag_dir_list_interval = 30

当时运行的 dag 是 example_bash_operator、example_branch_operator、example_python_operator 和我们开发的一个 quickDag。

在某些情况下,所有这些都只是简单的任务/操作符,如 DummyOperators、BranchOperatos、BashOperators,但只执行 echo 或 sleep 并且 PythonOperators 也只执行 sleep 。总共大约有 40 个任务,但并非所有任务都并行运行,因为其中一些是下游、依赖等,我们的并行度设置为 10,如上所述只有一个工作人员,并且dag_concurrency设置为到 5。

我在气流日志中看不到任何异常,在任务日志中也看不到任何异常。

只运行这些 dag 中的一个,似乎气流正在相应地工作。

我可以在调度程序 pod 中看到很多调度程序进程,每个进程使用 0.2% 或更多的内存:

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
461384 airflow 20 0 836700 127212 23908 S 36.5 0.4 0:01.19 /usr/bin/python /usr/bin/airflow scheduler 461397 airflow 20 0 356168 86320 5044 R 14.0 0.3 0:00.42 /usr/bin/python /usr/bin/airflow scheduler 44 airflow 20 0 335920 71700 10600 S 28.9 0.2 403:32.05 /usr/bin/python /usr/bin/airflow scheduler 56 airflow 20 0 330548 59164 3524 S 0.0 0.2 0:00.02

这是使用 0.3% 内存运行的任务之一:

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 462042 airflow 20 0 282632 91120 10544 S 1.7 0.3 0:02.66 /usr/bin/python /usr/bin/airflow run example_bash_operator runme_1 2018-08-29T07:39:48.193735+00:00 --local -sd /usr/lib/python2.7/site-packages/apache_airflow-1.10.0-py2.7.egg/airflow/example_dags/example_bash_operator.py

cwu*_*rtz 5

实际上并没有一个简明的经验法则可以遵循,因为它可能会因您的工作流程而异。

如您所见,调度程序将创建多个 fork 进程。此外,每个任务(Dummy 除外)都将在其自己的进程中运行。根据操作员和它正在处理的数据,每个任务所需的内存量可能会有很大差异。

并行度设置将直接限制在所有 dag 运行/任务中同时运行的任务数量,这对使用 LocalExecutor 的您将产生最显着的效果。您也可以尝试将max_threads下设置[scheduler]为 1。

因此,一个(非常)普遍的经验法则是善待资源:

[256 for scheduler itself] + ( [parallelism] * (100MB + [size of data you'll process]) )
Run Code Online (Sandbox Code Playgroud)

数据大小需要更改的位置取决于您是加载完整数据集还是在任务执行过程中处理它的块。

即使您认为不需要扩展集群,我仍然建议使用 CeleryExecutor,如果只是为了将调度程序和任务彼此隔离。这样,如果您的调度程序或 celery 工作人员死了,它不会同时中断。特别是在 k8 中运行时,如果您的调度程序 sigterms 它将与任何正在运行的任务一起杀死它。如果您在不同的 pod 中运行它们并且调度程序 pod 重新启动,则您可以不间断地完成任务。如果你有更多的工人,它会减少来自其他任务的内存/处理尖峰的影响。