Kev*_* Li 12 directed-acyclic-graphs airflow airflow-scheduler
最近,我将 Airflow 从 1.9 升级到 1.10.3(最新版本)。
但是,我确实注意到与 SubDag 并发相关的性能问题。SubDag 中只能拾取 1 个任务,这不是应该的方式,我们对 SubDag 的并发设置为 8。
见下:
get_monthly_summary-214和get_monthly_summary-215是两个SubDag,可以由父dag并发在controller中并行运行
但是当放大 SubDag 时说get_monthly_summary-214,然后
你肯定可以看到一次只有 1 个任务在运行,其他的都在排队,一直这样运行。当我们检查SubDag并发时,实际上是我们在代码中指定的8:

我们确实设置了池槽大小,它是 32,我们确实有 8 个 celery worker 来接收排队的任务,我们与并发关联的气流配置如下:
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16
Run Code Online (Sandbox Code Playgroud)
此外,所有 SubDag 都使用名为 的队列进行配置mini,而其所有内部任务都是名为 的默认队列default,因为如果我们在同一队列上同时运行 SubDag 运算符和 SubDag 内部任务,我们可能会出现一些死锁问题。我还尝试将default队列用于所有任务和操作员,但无济于事。
旧版本 1.9 似乎很好,每个 SubDag 可以并行执行多个任务,我们错过了什么吗?
Kev*_* Li 10
根据上面发布的 @kaxil 的发现,如果您仍然希望并行执行 subdag 内的任务,则解决方案是创建一个包装函数来显式传递executorwhen 构造SubDagOperator:
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors import GetDefaultExecutor
def sub_dag_operator_with_default_executor(subdag, *args, **kwargs):
return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)
sub_dag_operator_with_default_executor创建 subdag 运算符时调用。为了减轻 sub dag 运营商的性能担忧
我们应该将 subdag_operator 的默认执行器更改为 SequentialExecutor。气流池不受 subdagooperator 的支持,因此它可能会消耗所有工作人员资源(例如在 celeryExecutor 中)。这会导致 airflow-74 中提到的问题并限制 subdag_operator 的使用。我们通过指定使用顺序执行器在生产中使用 subdag_operator。
我们建议创建一个特殊的队列(在我们的例子中我们指定 queue='mini')和 celery worker 来处理 subdag_operator,这样它就不会消耗你所有的普通 celery worker 的资源。如下:
dag = DAG(
dag_id=DAG_NAME,
description=f"{DAG_NAME}-{__version__}",
...
)
with dag:
ur_operator = sub_dag_operator_with_default_executor(
task_id=f"your_task_id",
subdag=load_sub_dag(
parent_dag_name=DAG_NAME,
child_dag_name=f"your_child_dag_name",
args=args,
concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY,
),
queue="mini",
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
然后当你创建你的特殊 celery worker(我们使用像 2 核和 3G 内存的轻量级主机)时,指定AIRFLOW__CELERY__DEFAULT_QUEUEas mini,取决于你想要并行运行多少子 dag 操作符,你应该创建多个特殊的 celery worker负载均衡资源,我们建议每个特殊的celery worker一次最多处理2个sub dag operator,否则会被耗尽(例如,在2核3G内存的主机上内存不足)
您也可以concurrency通过concurrency_in_sub_dag在气流 UIVariables配置页面中创建的 ENV VAR 在您的 subdag 内部进行调整。
更新 [22/05/2020] 以上仅适用于气流 (<=1.10.3, >= 1.10.0) 对于 1.10.3 以上的气流,请使用
from airflow.executors import get_default_executor
Run Code Online (Sandbox Code Playgroud)
反而。
这是因为在 Airflow 1.9.0 中,SubdagOperator 使用了默认的 Executor。
气流 1.9.0:https : //github.com/apache/airflow/blob/1.9.0/airflow/operators/subdag_operator.py#L33
class SubDagOperator(BaseOperator):
template_fields = tuple()
ui_color = '#555'
ui_fgcolor = '#fff'
@provide_session
@apply_defaults
def __init__(
self,
subdag,
executor=GetDefaultExecutor(),
*args, **kwargs):
Run Code Online (Sandbox Code Playgroud)
但是,从 Airflow 1.10 开始,SubDagOperator 的默认执行器更改为 SequentialExecutor
气流 >=1.10 : https://github.com/apache/airflow/blob/1.10.0/airflow/operators/subdag_operator.py#L38
class SubDagOperator(BaseOperator):
template_fields = tuple()
ui_color = '#555'
ui_fgcolor = '#fff'
@provide_session
@apply_defaults
def __init__(
self,
subdag,
executor=SequentialExecutor(),
*args, **kwargs):
Run Code Online (Sandbox Code Playgroud)
更改的详细原因可以在https://github.com/apache/airflow/pull/3251 中找到
我们应该将 subdag_operator 的默认执行器更改为 SequentialExecutor。气流池不受 subdagooperator 的支持,因此它可能会消耗所有工作人员资源(例如在 celeryExecutor 中)。这会导致 airflow-74 中提到的问题并限制 subdag_operator 的使用。我们通过指定使用顺序执行器在生产中使用 subdag_operator。
| 归档时间: |
|
| 查看次数: |
6372 次 |
| 最近记录: |